Skip to content

Commit 618ce0a

Browse files
authored
[3.13] gh-115634: Fix ProcessPoolExecutor deadlock with max_tasks_per_child (GH-140900) (#152928)
The idle worker semaphore counts task completions, not idle workers, so it can hold a stale token released by a worker that later exited upon reaching its max_tasks_per_child limit. The worker replacement path consumed such tokens and skipped spawning a replacement, deadlocking the remaining queued tasks once no workers were left. Replace dead workers based on len(self._processes) without consulting the semaphore. The submit() path is unchanged, preserving on-demand spawning and idle worker reuse. Replace the documentation note added in GH-140897 with a versionchanged entry now that the bug is fixed. Based on a fix proposed by Tabrez Mohammed. (cherry picked from commit b706767)
1 parent da2cb8f commit 618ce0a

4 files changed

Lines changed: 68 additions & 12 deletions

File tree

Doc/library/concurrent.futures.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -285,11 +285,6 @@ in a REPL or a lambda should not be expected to work.
285285
default in absence of a *mp_context* parameter. This feature is incompatible
286286
with the "fork" start method.
287287

288-
.. note::
289-
Bugs have been reported when using the *max_tasks_per_child* feature that
290-
can result in the :class:`ProcessPoolExecutor` hanging in some
291-
circumstances. Follow its eventual resolution in :gh:`115634`.
292-
293288
.. versionchanged:: 3.3
294289
When one of the worker processes terminates abruptly, a
295290
:exc:`~concurrent.futures.process.BrokenProcessPool` error is now raised.
@@ -327,6 +322,11 @@ in a REPL or a lambda should not be expected to work.
327322
*max_workers* uses :func:`os.process_cpu_count` by default, instead of
328323
:func:`os.cpu_count`.
329324

325+
.. versionchanged:: next
326+
Fixed a deadlock (:gh:`115634`) where the executor could hang after
327+
a worker process exited upon reaching its *max_tasks_per_child*
328+
limit while tasks remained queued.
329+
330330
.. _processpoolexecutor-example:
331331

332332
ProcessPoolExecutor Example

Lib/concurrent/futures/process.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ def run(self):
360360
if executor := self.executor_reference():
361361
if process_exited:
362362
with self.shutdown_lock:
363-
executor._adjust_process_count()
363+
executor._replace_dead_worker()
364364
else:
365365
executor._idle_worker_semaphore.release()
366366
del executor
@@ -751,6 +751,30 @@ def _start_executor_manager_thread(self):
751751
_threads_wakeups[self._executor_manager_thread] = \
752752
self._executor_manager_thread_wakeup
753753

754+
def _replace_dead_worker(self):
755+
# gh-132969: avoid error when state is reset and executor is still running,
756+
# which will happen when shutdown(wait=False) is called.
757+
if self._processes is None:
758+
return
759+
760+
# A replacement is pointless when shutting down with nothing left
761+
# to run. Both attributes are read under _shutdown_lock, which
762+
# shutdown() holds while setting _shutdown_thread.
763+
assert self._shutdown_lock.locked()
764+
if self._shutdown_thread and not self._pending_work_items:
765+
return
766+
767+
# gh-115634: A worker exited after reaching max_tasks_per_child and
768+
# has been removed from self._processes. Do not consult
769+
# _idle_worker_semaphore here: it counts task completions, not idle
770+
# workers, so it can hold a stale token released by the now-dead
771+
# worker. Trusting such a token would leave the pool a worker short,
772+
# deadlocking once all workers reach their task limit. Spawning is
773+
# safe from this (manager) thread despite gh-90622 because
774+
# max_tasks_per_child is rejected for the "fork" start method.
775+
if len(self._processes) < self._max_workers:
776+
self._spawn_process()
777+
754778
def _adjust_process_count(self):
755779
# gh-132969: avoid error when state is reset and executor is still running,
756780
# which will happen when shutdown(wait=False) is called.
@@ -763,12 +787,12 @@ def _adjust_process_count(self):
763787

764788
process_count = len(self._processes)
765789
if process_count < self._max_workers:
766-
# Assertion disabled as this codepath is also used to replace a
767-
# worker that unexpectedly dies, even when using the 'fork' start
768-
# method. That means there is still a potential deadlock bug. If a
769-
# 'fork' mp_context worker dies, we'll be forking a new one when
770-
# we know a thread is running (self._executor_manager_thread).
771-
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
790+
# gh-90622: spawning a child via fork while another thread is
791+
# running can deadlock in the child. submit() only calls this
792+
# method when using a non-fork start method.
793+
assert (self._safe_to_dynamically_spawn_children
794+
or not self._executor_manager_thread), (
795+
'https://github.com/python/cpython/issues/90622')
772796
self._spawn_process()
773797

774798
def _launch_processes(self):

Lib/test/test_concurrent_futures/test_process_pool.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,33 @@ def test_max_tasks_per_child_defaults_to_spawn_context(self):
201201
executor = self.executor_type(1, max_tasks_per_child=3)
202202
self.assertEqual(executor._mp_context.get_start_method(), "spawn")
203203

204+
def test_max_tasks_per_child_pending_tasks_gh115634(self):
205+
# gh-115634: A worker exiting at its max_tasks_per_child limit left a
206+
# stale token in the idle worker semaphore, so no replacement worker
207+
# was spawned and the remaining queued tasks deadlocked. Submit more
208+
# tasks than the pool can run at once so a backlog is queued while
209+
# workers hit their task limit.
210+
context = self.get_context()
211+
if context.get_start_method(allow_none=False) == "fork":
212+
raise unittest.SkipTest("Incompatible with the fork start method.")
213+
214+
for max_workers, max_tasks, num_tasks in [(1, 2, 6), (2, 2, 8)]:
215+
with self.subTest(max_workers=max_workers, max_tasks=max_tasks):
216+
executor = self.executor_type(
217+
max_workers, mp_context=context,
218+
max_tasks_per_child=max_tasks)
219+
try:
220+
futures = [executor.submit(mul, i, 2)
221+
for i in range(num_tasks)]
222+
# If the deadlock regresses, the result() calls time out,
223+
# and the shutdown below hangs until the test timeout.
224+
results = [f.result(timeout=support.SHORT_TIMEOUT)
225+
for f in futures]
226+
self.assertEqual(results,
227+
[i * 2 for i in range(num_tasks)])
228+
finally:
229+
executor.shutdown(wait=True, cancel_futures=True)
230+
204231
def test_max_tasks_early_shutdown(self):
205232
context = self.get_context()
206233
if context.get_start_method(allow_none=False) == "fork":
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Fix a deadlock in :class:`concurrent.futures.ProcessPoolExecutor` when
2+
using ``max_tasks_per_child``, present since the feature was introduced in
3+
Python 3.11. The executor stopped scheduling queued tasks after a worker
4+
process exited upon reaching its task limit. Based on a fix proposed by
5+
Tabrez Mohammed.

0 commit comments

Comments
 (0)