Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 122 additions & 45 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=N
return


def _spawn_worker(mp_context, call_queue, result_queue, initializer,
initargs, max_tasks_per_child, processes):
"""Start one worker process and record it in *processes* by pid."""
p = mp_context.Process(
target=_process_worker,
args=(call_queue,
result_queue,
initializer,
initargs,
max_tasks_per_child))
p.start()
processes[p.pid] = p


class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes.
Expand Down Expand Up @@ -321,6 +335,15 @@ def weakref_cb(_,
# exiting safely
self.max_tasks_per_child = executor._max_tasks_per_child

# gh-119592: Needed to size worker replacement, and immutable, so
# keep a copy rather than reading it back through the executor
# weakref. The rest of the spawn configuration is deliberately NOT
# copied here: holding user-provided objects (initializer,
# initargs, mp_context) in this always-reachable running thread
# could keep the executor itself reachable through them, breaking
# garbage-collection-triggered shutdown.
self.max_workers = executor._max_workers

# A dict mapping work ids to _WorkItems e.g.
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items
Expand Down Expand Up @@ -357,12 +380,14 @@ def run(self):
# while waiting on new results.
del result_item

if executor := self.executor_reference():
if process_exited:
with self.shutdown_lock:
executor._replace_dead_worker()
else:
executor._idle_worker_semaphore.release()
if process_exited:
with self.shutdown_lock:
broken = self._replace_dead_worker()
if broken is not None:
self.terminate_broken(*broken)
return
elif executor := self.executor_reference():
executor._idle_worker_semaphore.release()
del executor

if self.is_shutting_down():
Expand All @@ -379,6 +404,71 @@ def run(self):
self.join_executor_internals()
return

def _replace_dead_worker(self):
"""Spawn a replacement for a worker that exited at its
max_tasks_per_child limit. Called under self.shutdown_lock.
Returns None while the pool can still make progress, otherwise a
(cause, message) tuple describing why the remaining work items can
never run, so that run() can fail their futures.
"""
assert self.shutdown_lock.locked()
cause = None
message = None
executor = self.executor_reference()
if executor is None:
# gh-152967: The executor was garbage collected; nothing can
# spawn a replacement worker for it anymore.
message = ("The ProcessPoolExecutor was garbage collected with "
"work pending after its last worker process exited "
"upon reaching max_tasks_per_child; the pending work "
"can never be run.")
elif executor._force_shutting_down:
# terminate_workers()/kill_workers() is tearing the pool down;
# a replacement worker would escape the kill and run work
# items that were enqueued before it.
message = ("A worker process exited while the pool was being "
"forcefully shut down; work that was still enqueued "
"will not be run.")
elif self.pending_work_items or not self.is_shutting_down():
# gh-115634: Do not consult the executor's
# _idle_worker_semaphore here: it counts task completions, not
# idle workers, so it can hold a stale token released by the
# now-dead worker. Trusting such a token would leave the pool
# a worker short, deadlocking once all workers reach their
# task limit. Spawning from this (manager) thread is safe
# despite gh-90622 because max_tasks_per_child is rejected for
# the "fork" start method.
if len(self.processes) < self.max_workers:
# gh-119592: Spawn using state owned by this thread and
# configuration read through the live weakref (which
# shutdown() never clears), not the executor state that
# shutdown(wait=False) clears concurrently.
try:
_spawn_worker(executor._mp_context, self.call_queue,
self.result_queue, executor._initializer,
executor._initargs,
self.max_tasks_per_child, self.processes)
except Exception as exc:
# While other workers remain the pool has merely lost
# capacity and they keep draining the queue; with none
# left the failure is reported below.
cause = format_exception(exc)
message = ("A replacement worker process could not be "
"started, leaving the pool without workers "
"to run the remaining work.")
del executor

if not self.processes and (self.pending_work_items
or cause is not None):
# No worker processes remain and no replacement can be
# spawned: any remaining work items can never run. A spawn
# failure breaks the pool even with nothing pending; leaving
# a zero-worker pool alive would hang a later submit() on a
# stale _idle_worker_semaphore token instead of raising.
return (cause, message)
return None

def add_call_item_to_queue(self):
# Fills call_queue with _WorkItems from pending_work_items.
# This function never blocks.
Expand Down Expand Up @@ -455,10 +545,11 @@ def is_shutting_down(self):
return (_global_shutdown or executor is None
or executor._shutdown_thread)

def _terminate_broken(self, cause):
def _terminate_broken(self, cause, bpe_message=None):
# Terminate the executor because it is in a broken state. The cause
# argument can be used to display more information on the error that
# lead the executor into becoming broken.
# lead the executor into becoming broken. bpe_message overrides the
# default message on the BrokenProcessPool set on pending futures.

# Mark the process pool broken so that submits fail right now.
executor = self.executor_reference()
Expand Down Expand Up @@ -489,11 +580,12 @@ def _terminate_broken(self, cause):
cause_str = "\n".join(errors)
cause_tb = f"\n'''\n{cause_str}'''" if cause_str else None

if bpe_message is None:
bpe_message = ("A process in the process pool was terminated "
"abruptly while the future was running or pending.")
# Mark pending tasks as failed.
for work_id, work_item in self.pending_work_items.items():
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
bpe = BrokenProcessPool(bpe_message)
if cause_tb is not None:
bpe.__cause__ = _RemoteTraceback(cause_tb)
try:
Expand All @@ -518,9 +610,9 @@ def _terminate_broken(self, cause):
# clean up resources
self._join_executor_internals(broken=True)

def terminate_broken(self, cause):
def terminate_broken(self, cause, bpe_message=None):

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this might look like a public API change in the diff... it's on the _ExecutorManagerThread internal use only class. Fine to backport.

with self.shutdown_lock:
self._terminate_broken(cause)
self._terminate_broken(cause, bpe_message)

def flag_executor_shutting_down(self):
# Flag the executor as shutting down and cancel remaining tasks if
Expand Down Expand Up @@ -733,6 +825,7 @@ def __init__(self, max_workers=None, mp_context=None,
self._queue_count = 0
self._pending_work_items = {}
self._cancel_pending_futures = False
self._force_shutting_down = False

# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of executor_manager_thread from another thread (e.g.
Expand Down Expand Up @@ -772,34 +865,15 @@ def _start_executor_manager_thread(self):
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup

def _replace_dead_worker(self):
def _adjust_process_count(self):
# gh-132969: avoid error when state is reset and executor is still running,
# which will happen when shutdown(wait=False) is called.
if self._processes is None:
return

# A replacement is pointless when shutting down with nothing left
# to run. Both attributes are read under _shutdown_lock, which
# shutdown() holds while setting _shutdown_thread.
assert self._shutdown_lock.locked()
if self._shutdown_thread and not self._pending_work_items:
return

# gh-115634: A worker exited after reaching max_tasks_per_child and
# has been removed from self._processes. Do not consult
# _idle_worker_semaphore here: it counts task completions, not idle
# workers, so it can hold a stale token released by the now-dead
# worker. Trusting such a token would leave the pool a worker short,
# deadlocking once all workers reach their task limit. Spawning is
# safe from this (manager) thread despite gh-90622 because
# max_tasks_per_child is rejected for the "fork" start method.
if len(self._processes) < self._max_workers:
self._spawn_process()

def _adjust_process_count(self):
# gh-132969: avoid error when state is reset and executor is still running,
# which will happen when shutdown(wait=False) is called.
if self._processes is None:
# gh-152967: A forceful shutdown is in progress; a worker spawned
# here could escape its process snapshot and keep running work.
if self._force_shutting_down:
return

# if there's an idle process, we don't need to spawn a new one.
Expand All @@ -825,15 +899,10 @@ def _launch_processes(self):
self._spawn_process()

def _spawn_process(self):
p = self._mp_context.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._initializer,
self._initargs,
self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p
_spawn_worker(self._mp_context, self._call_queue,
self._result_queue, self._initializer,
self._initargs, self._max_tasks_per_child,
self._processes)

def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
Expand Down Expand Up @@ -930,6 +999,14 @@ def _force_shutdown(self, operation):
if operation not in _SHUTDOWN_CALLBACK_OPERATION:
raise ValueError(f"Unsupported operation: {operation!r}")

# gh-152967: Stop the manager thread from spawning replacement
# workers before we copy the processes to signal: a worker spawned
# after the copy would survive the loop below and run enqueued
# work items. Taking the lock orders this against the manager's
# worker replacement, which runs under the same lock.
with self._shutdown_lock:
self._force_shutting_down = True

processes = {}
if self._processes:
processes = self._processes.copy()
Expand Down
Loading
Loading