diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8f200fc1c82613..e3b6c4a5305615 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -269,6 +269,20 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N return +def _spawn_worker(mp_context, call_queue, result_queue, initializer, + initargs, max_tasks_per_child, processes): + """Start one worker process and record it in *processes* by pid.""" + p = mp_context.Process( + target=_process_worker, + args=(call_queue, + result_queue, + initializer, + initargs, + max_tasks_per_child)) + p.start() + processes[p.pid] = p + + class _ExecutorManagerThread(threading.Thread): """Manages the communication between this process and the worker processes. @@ -321,6 +335,15 @@ def weakref_cb(_, # exiting safely self.max_tasks_per_child = executor._max_tasks_per_child + # gh-119592: Needed to size worker replacement, and immutable, so + # keep a copy rather than reading it back through the executor + # weakref. The rest of the spawn configuration is deliberately NOT + # copied here: holding user-provided objects (initializer, + # initargs, mp_context) in this always-reachable running thread + # could keep the executor itself reachable through them, breaking + # garbage-collection-triggered shutdown. + self.max_workers = executor._max_workers + # A dict mapping work ids to _WorkItems e.g. # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} self.pending_work_items = executor._pending_work_items @@ -357,12 +380,14 @@ def run(self): # while waiting on new results. del result_item - if executor := self.executor_reference(): - if process_exited: - with self.shutdown_lock: - executor._replace_dead_worker() - else: - executor._idle_worker_semaphore.release() + if process_exited: + with self.shutdown_lock: + broken = self._replace_dead_worker() + if broken is not None: + self.terminate_broken(*broken) + return + elif executor := self.executor_reference(): + executor._idle_worker_semaphore.release() del executor if self.is_shutting_down(): @@ -379,6 +404,71 @@ def run(self): self.join_executor_internals() return + def _replace_dead_worker(self): + """Spawn a replacement for a worker that exited at its + max_tasks_per_child limit. Called under self.shutdown_lock. + + Returns None while the pool can still make progress, otherwise a + (cause, message) tuple describing why the remaining work items can + never run, so that run() can fail their futures. + """ + assert self.shutdown_lock.locked() + cause = None + message = None + executor = self.executor_reference() + if executor is None: + # gh-152967: The executor was garbage collected; nothing can + # spawn a replacement worker for it anymore. + message = ("The ProcessPoolExecutor was garbage collected with " + "work pending after its last worker process exited " + "upon reaching max_tasks_per_child; the pending work " + "can never be run.") + elif executor._force_shutting_down: + # terminate_workers()/kill_workers() is tearing the pool down; + # a replacement worker would escape the kill and run work + # items that were enqueued before it. + message = ("A worker process exited while the pool was being " + "forcefully shut down; work that was still enqueued " + "will not be run.") + elif self.pending_work_items or not self.is_shutting_down(): + # gh-115634: Do not consult the executor's + # _idle_worker_semaphore here: it counts task completions, not + # idle workers, so it can hold a stale token released by the + # now-dead worker. Trusting such a token would leave the pool + # a worker short, deadlocking once all workers reach their + # task limit. Spawning from this (manager) thread is safe + # despite gh-90622 because max_tasks_per_child is rejected for + # the "fork" start method. + if len(self.processes) < self.max_workers: + # gh-119592: Spawn using state owned by this thread and + # configuration read through the live weakref (which + # shutdown() never clears), not the executor state that + # shutdown(wait=False) clears concurrently. + try: + _spawn_worker(executor._mp_context, self.call_queue, + self.result_queue, executor._initializer, + executor._initargs, + self.max_tasks_per_child, self.processes) + except Exception as exc: + # While other workers remain the pool has merely lost + # capacity and they keep draining the queue; with none + # left the failure is reported below. + cause = format_exception(exc) + message = ("A replacement worker process could not be " + "started, leaving the pool without workers " + "to run the remaining work.") + del executor + + if not self.processes and (self.pending_work_items + or cause is not None): + # No worker processes remain and no replacement can be + # spawned: any remaining work items can never run. A spawn + # failure breaks the pool even with nothing pending; leaving + # a zero-worker pool alive would hang a later submit() on a + # stale _idle_worker_semaphore token instead of raising. + return (cause, message) + return None + def add_call_item_to_queue(self): # Fills call_queue with _WorkItems from pending_work_items. # This function never blocks. @@ -455,10 +545,11 @@ def is_shutting_down(self): return (_global_shutdown or executor is None or executor._shutdown_thread) - def _terminate_broken(self, cause): + def _terminate_broken(self, cause, bpe_message=None): # Terminate the executor because it is in a broken state. The cause # argument can be used to display more information on the error that - # lead the executor into becoming broken. + # lead the executor into becoming broken. bpe_message overrides the + # default message on the BrokenProcessPool set on pending futures. # Mark the process pool broken so that submits fail right now. executor = self.executor_reference() @@ -489,11 +580,12 @@ def _terminate_broken(self, cause): cause_str = "\n".join(errors) cause_tb = f"\n'''\n{cause_str}'''" if cause_str else None + if bpe_message is None: + bpe_message = ("A process in the process pool was terminated " + "abruptly while the future was running or pending.") # Mark pending tasks as failed. for work_id, work_item in self.pending_work_items.items(): - bpe = BrokenProcessPool("A process in the process pool was " - "terminated abruptly while the future was " - "running or pending.") + bpe = BrokenProcessPool(bpe_message) if cause_tb is not None: bpe.__cause__ = _RemoteTraceback(cause_tb) try: @@ -518,9 +610,9 @@ def _terminate_broken(self, cause): # clean up resources self._join_executor_internals(broken=True) - def terminate_broken(self, cause): + def terminate_broken(self, cause, bpe_message=None): with self.shutdown_lock: - self._terminate_broken(cause) + self._terminate_broken(cause, bpe_message) def flag_executor_shutting_down(self): # Flag the executor as shutting down and cancel remaining tasks if @@ -733,6 +825,7 @@ def __init__(self, max_workers=None, mp_context=None, self._queue_count = 0 self._pending_work_items = {} self._cancel_pending_futures = False + self._force_shutting_down = False # _ThreadWakeup is a communication channel used to interrupt the wait # of the main loop of executor_manager_thread from another thread (e.g. @@ -772,34 +865,15 @@ def _start_executor_manager_thread(self): _threads_wakeups[self._executor_manager_thread] = \ self._executor_manager_thread_wakeup - def _replace_dead_worker(self): + def _adjust_process_count(self): # gh-132969: avoid error when state is reset and executor is still running, # which will happen when shutdown(wait=False) is called. if self._processes is None: return - # A replacement is pointless when shutting down with nothing left - # to run. Both attributes are read under _shutdown_lock, which - # shutdown() holds while setting _shutdown_thread. - assert self._shutdown_lock.locked() - if self._shutdown_thread and not self._pending_work_items: - return - - # gh-115634: A worker exited after reaching max_tasks_per_child and - # has been removed from self._processes. Do not consult - # _idle_worker_semaphore here: it counts task completions, not idle - # workers, so it can hold a stale token released by the now-dead - # worker. Trusting such a token would leave the pool a worker short, - # deadlocking once all workers reach their task limit. Spawning is - # safe from this (manager) thread despite gh-90622 because - # max_tasks_per_child is rejected for the "fork" start method. - if len(self._processes) < self._max_workers: - self._spawn_process() - - def _adjust_process_count(self): - # gh-132969: avoid error when state is reset and executor is still running, - # which will happen when shutdown(wait=False) is called. - if self._processes is None: + # gh-152967: A forceful shutdown is in progress; a worker spawned + # here could escape its process snapshot and keep running work. + if self._force_shutting_down: return # if there's an idle process, we don't need to spawn a new one. @@ -825,15 +899,10 @@ def _launch_processes(self): self._spawn_process() def _spawn_process(self): - p = self._mp_context.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._initializer, - self._initargs, - self._max_tasks_per_child)) - p.start() - self._processes[p.pid] = p + _spawn_worker(self._mp_context, self._call_queue, + self._result_queue, self._initializer, + self._initargs, self._max_tasks_per_child, + self._processes) def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: @@ -930,6 +999,14 @@ def _force_shutdown(self, operation): if operation not in _SHUTDOWN_CALLBACK_OPERATION: raise ValueError(f"Unsupported operation: {operation!r}") + # gh-152967: Stop the manager thread from spawning replacement + # workers before we copy the processes to signal: a worker spawned + # after the copy would survive the loop below and run enqueued + # work items. Taking the lock orders this against the manager's + # worker replacement, which runs under the same lock. + with self._shutdown_lock: + self._force_shutting_down = True + processes = {} if self._processes: processes = self._processes.copy() diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 205662c91c2558..dafbda862c51c2 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -6,11 +6,12 @@ import traceback import unittest import unittest.mock +import weakref from concurrent import futures from concurrent.futures.process import BrokenProcessPool from test import support -from test.support import hashlib_helper, warnings_helper +from test.support import hashlib_helper, threading_helper, warnings_helper from test.test_importlib.metadata.fixtures import parameterize from .executor import ExecutorTest, mul @@ -42,6 +43,13 @@ def _put_wait_put(queue, event): queue.put('finished') +def _report_wait_return(queue, event, value): + """ Used as part of _run_stranded_worker_exit_test """ + queue.put(value) + event.wait() + return value + + class ProcessPoolExecutorTest(ExecutorTest): @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') @@ -268,6 +276,126 @@ def test_max_tasks_per_child_pending_tasks_gh115634(self): finally: executor.shutdown(wait=True, cancel_futures=True) + def _run_stranded_worker_exit_test(self, *, shutdown, drop_reference): + # A worker exits upon reaching its max_tasks_per_child limit while + # more submitted work is queued. While the executor object is + # alive a replacement worker must be spawned and the remaining + # work executed; once it has been garbage collected no replacement + # is possible and the remaining futures must fail promptly instead + # of never resolving. + context = self.get_context() + if context.get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") + manager = context.Manager() + self.addCleanup(manager.join) + self.addCleanup(manager.shutdown) + started = manager.Queue() + gate = manager.Event() + + executor = self.executor_type( + 1, mp_context=context, max_tasks_per_child=1) + futs = [executor.submit(_report_wait_return, started, gate, i) + for i in range(3)] + self.addCleanup(threading_helper.join_thread, + executor._executor_manager_thread) + # Wait until the worker is inside the first task so that it exits + # at its task limit only after the executor has been shut down + # and/or garbage collected below. + self.assertEqual(started.get(timeout=support.SHORT_TIMEOUT), 0) + if shutdown: + executor.shutdown(wait=False) + if drop_reference: + executor_ref = weakref.ref(executor) + executor = None + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + support.gc_collect() + if executor_ref() is None: + break + gate.set() + + self.assertEqual(futs[0].result(timeout=support.SHORT_TIMEOUT), 0) + if drop_reference: + for fut in futs[1:]: + with self.assertRaisesRegex(BrokenProcessPool, + "garbage collected"): + fut.result(timeout=support.SHORT_TIMEOUT) + else: + results = [f.result(timeout=support.SHORT_TIMEOUT) + for f in futs[1:]] + self.assertEqual(results, [1, 2]) + + def test_shutdown_no_wait_max_tasks_gh119592(self): + # gh-119592: shutdown(wait=False) used to clear executor state that + # worker replacement relied on. A worker exiting at its + # max_tasks_per_child limit afterwards could not be replaced, so + # the remaining submitted work never ran, and a racing worker exit + # could crash the manager thread on the partially cleared state. + for drop_reference in (False, True): + with self.subTest(drop_reference=drop_reference): + self._run_stranded_worker_exit_test( + shutdown=True, drop_reference=drop_reference) + + def test_gc_during_max_tasks_worker_exit_gh152967(self): + # gh-152967: If the executor was garbage collected without + # shutdown() while its last worker exited at its + # max_tasks_per_child limit, no replacement worker could be + # spawned and the remaining futures were never resolved. + self._run_stranded_worker_exit_test( + shutdown=False, drop_reference=True) + + def _run_unreplaceable_worker_exit_test(self, *, error_regex, + force_shutting_down=False, + failing_spawn=False): + # Drive a max_tasks_per_child worker exit while worker + # replacement is impossible; the queued futures must fail + # promptly with a BrokenProcessPool explaining why. + context = self.get_context() + if context.get_start_method(allow_none=False) == "fork": + raise unittest.SkipTest("Incompatible with the fork start method.") + manager = context.Manager() + self.addCleanup(manager.join) + self.addCleanup(manager.shutdown) + started = manager.Queue() + gate = manager.Event() + + executor = self.executor_type( + 1, mp_context=context, max_tasks_per_child=1) + futs = [executor.submit(_report_wait_return, started, gate, i) + for i in range(3)] + self.addCleanup(threading_helper.join_thread, + executor._executor_manager_thread) + self.assertEqual(started.get(timeout=support.SHORT_TIMEOUT), 0) + if force_shutting_down: + with executor._shutdown_lock: + executor._force_shutting_down = True + if failing_spawn: + spawn_patch = unittest.mock.patch( + "concurrent.futures.process._spawn_worker", + side_effect=OSError("spawn failed")) + spawn_patch.start() + self.addCleanup(spawn_patch.stop) + gate.set() + + self.assertEqual(futs[0].result(timeout=support.SHORT_TIMEOUT), 0) + for fut in futs[1:]: + with self.assertRaisesRegex(BrokenProcessPool, error_regex): + fut.result(timeout=support.SHORT_TIMEOUT) + + def test_force_shutdown_during_max_tasks_worker_exit(self): + # A worker exiting at its max_tasks_per_child limit during + # terminate_workers()/kill_workers() must not be replaced (the + # replacement would escape the kill); queued futures fail instead. + self._run_unreplaceable_worker_exit_test( + force_shutting_down=True, + error_regex="forcefully shut down") + + def test_failed_worker_replacement_breaks_pool(self): + # If no replacement worker can be started and no workers remain, + # the pool must break rather than strand the queued futures. + self._run_unreplaceable_worker_exit_test( + failing_spawn=True, + error_regex="could not be started") + def test_max_tasks_early_shutdown(self): context = self.get_context() if context.get_start_method(allow_none=False) == "fork": diff --git a/Misc/NEWS.d/next/Library/2026-07-03-18-30-00.gh-issue-119592.mQr3Vx.rst b/Misc/NEWS.d/next/Library/2026-07-03-18-30-00.gh-issue-119592.mQr3Vx.rst new file mode 100644 index 00000000000000..f718c877c8ed22 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2026-07-03-18-30-00.gh-issue-119592.mQr3Vx.rst @@ -0,0 +1,11 @@ +Fix :class:`concurrent.futures.ProcessPoolExecutor` stranding submitted +work forever when a worker process exited upon reaching its +*max_tasks_per_child* limit after +:meth:`~concurrent.futures.Executor.shutdown` was called with +``wait=False``: a replacement worker is now spawned and the remaining work +executed as documented. If the executor has instead been garbage collected +without ``shutdown()`` (:gh:`152967`), or a replacement worker cannot be +started, the remaining futures now fail with +:exc:`~concurrent.futures.process.BrokenProcessPool` instead of never +resolving. A worker exit racing ``shutdown(wait=False)`` can also no +longer crash the executor management thread.