Skip to content

Commit b262659

Browse files
miss-islingtonmrknmc
authored andcommitted
bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit (GH-6144) (GH-6445)
Executors in concurrent.futures accepted tasks after executor was shutdown by interpreter exit. Tasks were left in PENDING state forever. This fix changes submit to instead raise a RuntimeError. (cherry picked from commit c4b695f) Co-authored-by: Mark Nemec <mrknmc@me.com>
1 parent 9bb8ceb commit b262659

4 files changed

Lines changed: 44 additions & 0 deletions

File tree

Lib/concurrent/futures/process.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,10 @@ def shutdown_worker():
423423
# - The executor that owns this worker has been shutdown.
424424
if shutting_down():
425425
try:
426+
# Flag the executor as shutting down as early as possible if it
427+
# is not gc-ed yet.
428+
if executor is not None:
429+
executor._shutdown_thread = True
426430
# Since no new work items can be added, it is safe to shutdown
427431
# this thread if there are no pending work items.
428432
if not pending_work_items:
@@ -595,6 +599,9 @@ def submit(self, fn, *args, **kwargs):
595599
raise BrokenProcessPool(self._broken)
596600
if self._shutdown_thread:
597601
raise RuntimeError('cannot schedule new futures after shutdown')
602+
if _global_shutdown:
603+
raise RuntimeError('cannot schedule new futures after '
604+
'interpreter shutdown')
598605

599606
f = _base.Future()
600607
w = _WorkItem(f, fn, args, kwargs)

Lib/concurrent/futures/thread.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ def _worker(executor_reference, work_queue, initializer, initargs):
8787
# - The executor that owns the worker has been collected OR
8888
# - The executor that owns the worker has been shutdown.
8989
if _shutdown or executor is None or executor._shutdown:
90+
# Flag the executor as shutting down as early as possible if it
91+
# is not gc-ed yet.
92+
if executor is not None:
93+
executor._shutdown = True
9094
# Notice other workers
9195
work_queue.put(None)
9296
return
@@ -145,6 +149,9 @@ def submit(self, fn, *args, **kwargs):
145149

146150
if self._shutdown:
147151
raise RuntimeError('cannot schedule new futures after shutdown')
152+
if _shutdown:
153+
raise RuntimeError('cannot schedule new futures after'
154+
'interpreter shutdown')
148155

149156
f = _base.Future()
150157
w = _WorkItem(f, fn, args, kwargs)

Lib/test/test_concurrent_futures.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,34 @@ def test_interpreter_shutdown(self):
303303
self.assertFalse(err)
304304
self.assertEqual(out.strip(), b"apple")
305305

306+
def test_submit_after_interpreter_shutdown(self):
307+
# Test the atexit hook for shutdown of worker threads and processes
308+
rc, out, err = assert_python_ok('-c', """if 1:
309+
import atexit
310+
@atexit.register
311+
def run_last():
312+
try:
313+
t.submit(id, None)
314+
except RuntimeError:
315+
print("runtime-error")
316+
raise
317+
from concurrent.futures import {executor_type}
318+
if __name__ == "__main__":
319+
context = '{context}'
320+
if not context:
321+
t = {executor_type}(5)
322+
else:
323+
from multiprocessing import get_context
324+
context = get_context(context)
325+
t = {executor_type}(5, mp_context=context)
326+
t.submit(id, 42).result()
327+
""".format(executor_type=self.executor_type.__name__,
328+
context=getattr(self, "ctx", "")))
329+
# Errors in atexit hooks don't change the process exit code, check
330+
# stderr manually.
331+
self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
332+
self.assertEqual(out.strip(), b"runtime-error")
333+
306334
def test_hang_issue12364(self):
307335
fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
308336
self.executor.shutdown()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Raise RuntimeError when ``executor.submit`` is called during interpreter
2+
shutdown.

0 commit comments

Comments
 (0)