Skip to content

Commit fe85f5a

Browse files
gpsheadmiss-islington
authored andcommitted
gh-115634: Fix ProcessPoolExecutor deadlock with max_tasks_per_child (GH-140900)
The idle worker semaphore counts task completions, not idle workers, so it can hold a stale token released by a worker that later exited upon reaching its max_tasks_per_child limit. The worker replacement path consumed such tokens and skipped spawning a replacement, deadlocking the remaining queued tasks once no workers were left. Replace dead workers based on len(self._processes) without consulting the semaphore. The submit() path is unchanged, preserving on-demand spawning and idle worker reuse. Replace the documentation note added in GH-140897 with a versionchanged entry now that the bug is fixed. Based on a fix proposed by Tabrez Mohammed. (cherry picked from commit b706767) Co-authored-by: Gregory P. Smith <68491+gpshead@users.noreply.github.com>
1 parent 8a2e34c commit fe85f5a

4 files changed

Lines changed: 68 additions & 12 deletions

File tree

Doc/library/concurrent.futures.rst

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,11 +386,6 @@ in a REPL or a lambda should not be expected to work.
386386
default in absence of a *mp_context* parameter. This feature is incompatible
387387
with the "fork" start method.
388388

389-
.. note::
390-
Bugs have been reported when using the *max_tasks_per_child* feature that
391-
can result in the :class:`ProcessPoolExecutor` hanging in some
392-
circumstances. Follow its eventual resolution in :gh:`115634`.
393-
394389
.. versionchanged:: 3.3
395390
When one of the worker processes terminates abruptly, a
396391
:exc:`~concurrent.futures.process.BrokenProcessPool` error is now raised.
@@ -426,6 +421,11 @@ in a REPL or a lambda should not be expected to work.
426421
require the *fork* start method for :class:`ProcessPoolExecutor` you must
427422
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
428423

424+
.. versionchanged:: next
425+
Fixed a deadlock (:gh:`115634`) where the executor could hang after
426+
a worker process exited upon reaching its *max_tasks_per_child*
427+
limit while tasks remained queued.
428+
429429
.. method:: terminate_workers()
430430

431431
Attempt to terminate all living worker processes immediately by calling

Lib/concurrent/futures/process.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ def run(self):
360360
if executor := self.executor_reference():
361361
if process_exited:
362362
with self.shutdown_lock:
363-
executor._adjust_process_count()
363+
executor._replace_dead_worker()
364364
else:
365365
executor._idle_worker_semaphore.release()
366366
del executor
@@ -759,6 +759,30 @@ def _start_executor_manager_thread(self):
759759
_threads_wakeups[self._executor_manager_thread] = \
760760
self._executor_manager_thread_wakeup
761761

762+
def _replace_dead_worker(self):
763+
# gh-132969: avoid error when state is reset and executor is still running,
764+
# which will happen when shutdown(wait=False) is called.
765+
if self._processes is None:
766+
return
767+
768+
# A replacement is pointless when shutting down with nothing left
769+
# to run. Both attributes are read under _shutdown_lock, which
770+
# shutdown() holds while setting _shutdown_thread.
771+
assert self._shutdown_lock.locked()
772+
if self._shutdown_thread and not self._pending_work_items:
773+
return
774+
775+
# gh-115634: A worker exited after reaching max_tasks_per_child and
776+
# has been removed from self._processes. Do not consult
777+
# _idle_worker_semaphore here: it counts task completions, not idle
778+
# workers, so it can hold a stale token released by the now-dead
779+
# worker. Trusting such a token would leave the pool a worker short,
780+
# deadlocking once all workers reach their task limit. Spawning is
781+
# safe from this (manager) thread despite gh-90622 because
782+
# max_tasks_per_child is rejected for the "fork" start method.
783+
if len(self._processes) < self._max_workers:
784+
self._spawn_process()
785+
762786
def _adjust_process_count(self):
763787
# gh-132969: avoid error when state is reset and executor is still running,
764788
# which will happen when shutdown(wait=False) is called.
@@ -771,12 +795,12 @@ def _adjust_process_count(self):
771795

772796
process_count = len(self._processes)
773797
if process_count < self._max_workers:
774-
# Assertion disabled as this codepath is also used to replace a
775-
# worker that unexpectedly dies, even when using the 'fork' start
776-
# method. That means there is still a potential deadlock bug. If a
777-
# 'fork' mp_context worker dies, we'll be forking a new one when
778-
# we know a thread is running (self._executor_manager_thread).
779-
#assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
798+
# gh-90622: spawning a child via fork while another thread is
799+
# running can deadlock in the child. submit() only calls this
800+
# method when using a non-fork start method.
801+
assert (self._safe_to_dynamically_spawn_children
802+
or not self._executor_manager_thread), (
803+
'https://github.com/python/cpython/issues/90622')
780804
self._spawn_process()
781805

782806
def _launch_processes(self):

Lib/test/test_concurrent_futures/test_process_pool.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,33 @@ def test_max_tasks_per_child_defaults_to_spawn_context(self):
219219
executor = self.executor_type(1, max_tasks_per_child=3)
220220
self.assertEqual(executor._mp_context.get_start_method(), "spawn")
221221

222+
def test_max_tasks_per_child_pending_tasks_gh115634(self):
223+
# gh-115634: A worker exiting at its max_tasks_per_child limit left a
224+
# stale token in the idle worker semaphore, so no replacement worker
225+
# was spawned and the remaining queued tasks deadlocked. Submit more
226+
# tasks than the pool can run at once so a backlog is queued while
227+
# workers hit their task limit.
228+
context = self.get_context()
229+
if context.get_start_method(allow_none=False) == "fork":
230+
raise unittest.SkipTest("Incompatible with the fork start method.")
231+
232+
for max_workers, max_tasks, num_tasks in [(1, 2, 6), (2, 2, 8)]:
233+
with self.subTest(max_workers=max_workers, max_tasks=max_tasks):
234+
executor = self.executor_type(
235+
max_workers, mp_context=context,
236+
max_tasks_per_child=max_tasks)
237+
try:
238+
futures = [executor.submit(mul, i, 2)
239+
for i in range(num_tasks)]
240+
# If the deadlock regresses, the result() calls time out,
241+
# and the shutdown below hangs until the test timeout.
242+
results = [f.result(timeout=support.SHORT_TIMEOUT)
243+
for f in futures]
244+
self.assertEqual(results,
245+
[i * 2 for i in range(num_tasks)])
246+
finally:
247+
executor.shutdown(wait=True, cancel_futures=True)
248+
222249
def test_max_tasks_early_shutdown(self):
223250
context = self.get_context()
224251
if context.get_start_method(allow_none=False) == "fork":
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Fix a deadlock in :class:`concurrent.futures.ProcessPoolExecutor` when
2+
using ``max_tasks_per_child``, present since the feature was introduced in
3+
Python 3.11. The executor stopped scheduling queued tasks after a worker
4+
process exited upon reaching its task limit. Based on a fix proposed by
5+
Tabrez Mohammed.

0 commit comments

Comments
 (0)