Skip to content

Commit b7422fe

Browse files
committed
ENH ProcessPoolExecutor safe to unpickling errors
1 parent 6da0a2b commit b7422fe

2 files changed

Lines changed: 66 additions & 54 deletions

File tree

Lib/concurrent/futures/process.py

Lines changed: 59 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@
5353
import multiprocessing as mp
5454
from multiprocessing.connection import wait
5555
from multiprocessing.queues import Queue
56+
from multiprocessing import context
57+
5658
import threading
5759
import weakref
5860
from functools import partial
@@ -100,12 +102,20 @@ def _python_exit():
100102
for t, _ in items:
101103
t.join()
102104

105+
103106
# Controls how many more calls than processes will be queued in the call queue.
104107
# A smaller number will mean that processes spend more time idle waiting for
105108
# work while a larger number will make Future.cancel() succeed less frequently
106109
# (Futures in the call queue cannot be cancelled).
107110
EXTRA_QUEUED_CALLS = 1
108111

112+
#####
113+
_ForkingPickler = context.reduction.ForkingPickler
114+
PICKLE_NONE = _ForkingPickler.dumps(None)
115+
WORK_ID_SIZE = 8
116+
WORK_ID_ENC = "little"
117+
SENTINEL_MSG = b'\x00'
118+
109119

110120
# Hack to embed stringification of remote traceback in local traceback
111121

@@ -149,25 +159,6 @@ def __init__(self, work_id, fn, args, kwargs):
149159
self.kwargs = kwargs
150160

151161

152-
class _SafeQueue(Queue):
153-
"""Safe Queue set exception to the future object linked to a job"""
154-
def __init__(self, max_size=0, *, ctx, pending_work_items):
155-
self.pending_work_items = pending_work_items
156-
super().__init__(max_size, ctx=ctx)
157-
158-
def _on_queue_feeder_error(self, e, obj):
159-
if isinstance(obj, _CallItem):
160-
tb = traceback.format_exception(type(e), e, e.__traceback__)
161-
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
162-
work_item = self.pending_work_items.pop(obj.work_id, None)
163-
# work_item can be None if another process terminated. In this case,
164-
# the queue_manager_thread fails all work_items with BrokenProcessPool
165-
if work_item is not None:
166-
work_item.future.set_exception(e)
167-
else:
168-
super()._on_queue_feeder_error(e, obj)
169-
170-
171162
def _get_chunks(*iterables, chunksize):
172163
""" Iterates over zip()ed iterables in chunks. """
173164
it = zip(*iterables)
@@ -192,11 +183,14 @@ def _process_chunk(fn, chunk):
192183
def _sendback_result(result_queue, work_id, result=None, exception=None):
193184
"""Safely send back the given result or exception"""
194185
try:
195-
result_queue.put(_ResultItem(work_id, result=result,
196-
exception=exception))
186+
serialize_res = _ForkingPickler.dumps(
187+
_ResultItem(work_id, result=result, exception=exception))
197188
except BaseException as e:
198-
exc = _ExceptionWithTraceback(e, e.__traceback__)
199-
result_queue.put(_ResultItem(work_id, exception=exc))
189+
serialize_res = _ForkingPickler.dumps(_ResultItem(
190+
work_id, exception=_ExceptionWithTraceback(e, e.__traceback__)
191+
))
192+
result_queue._put_bytes(work_id.to_bytes(WORK_ID_SIZE, WORK_ID_ENC) +
193+
serialize_res)
200194

201195

202196
def _process_worker(call_queue, result_queue, initializer, initargs):
@@ -221,18 +215,21 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
221215
# mark the pool broken
222216
return
223217
while True:
224-
call_item = call_queue.get(block=True)
225-
if call_item is None:
218+
serialized_item = call_queue._get_bytes(block=True)
219+
if serialized_item == SENTINEL_MSG:
226220
# Wake up queue management thread
227-
result_queue.put(os.getpid())
221+
result_queue._put_bytes(
222+
os.getpid().to_bytes(WORK_ID_SIZE, WORK_ID_ENC))
228223
return
224+
work_id = int.from_bytes(serialized_item[:WORK_ID_SIZE], WORK_ID_ENC)
225+
call_item = None
229226
try:
227+
call_item = _ForkingPickler.loads(serialized_item[WORK_ID_SIZE:])
230228
r = call_item.fn(*call_item.args, **call_item.kwargs)
229+
_sendback_result(result_queue, work_id, result=r)
231230
except BaseException as e:
232231
exc = _ExceptionWithTraceback(e, e.__traceback__)
233-
_sendback_result(result_queue, call_item.work_id, exception=exc)
234-
else:
235-
_sendback_result(result_queue, call_item.work_id, result=r)
232+
_sendback_result(result_queue, work_id, exception=exc)
236233

237234
# Liberate the resource as soon as possible, to avoid holding onto
238235
# open files or shared memory that is not needed anymore
@@ -267,14 +264,27 @@ def _add_call_item_to_queue(pending_work_items,
267264
work_item = pending_work_items[work_id]
268265

269266
if work_item.future.set_running_or_notify_cancel():
270-
call_queue.put(_CallItem(work_id,
271-
work_item.fn,
272-
work_item.args,
273-
work_item.kwargs),
274-
block=True)
275-
else:
276-
del pending_work_items[work_id]
277-
continue
267+
call_item = _CallItem(work_id, work_item.fn, work_item.args,
268+
work_item.kwargs)
269+
try:
270+
msg = _ForkingPickler.dumps(call_item)
271+
except BaseException as e:
272+
tb = traceback.format_exception(
273+
type(e), e, e.__traceback__)
274+
e.__cause__ = _RemoteTraceback(
275+
'\n"""\n{}"""'.format(''.join(tb)))
276+
# work_item can be None if a process terminated and the
277+
# executor is broken
278+
if work_item is not None:
279+
work_item.future.set_exception(e)
280+
del work_item
281+
282+
del pending_work_items[work_id]
283+
continue
284+
call_queue._put_bytes(
285+
work_id.to_bytes(WORK_ID_SIZE, WORK_ID_ENC) + msg,
286+
block=True)
287+
278288

279289

280290
def _queue_management_worker(executor_reference,
@@ -321,7 +331,7 @@ def shutdown_worker():
321331
while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
322332
for i in range(n_children_to_stop - n_sentinels_sent):
323333
try:
324-
call_queue.put_nowait(None)
334+
call_queue._put_bytes(SENTINEL_MSG, block=False)
325335
n_sentinels_sent += 1
326336
except Full:
327337
break
@@ -352,19 +362,22 @@ def shutdown_worker():
352362
ready = wait(readers + worker_sentinels)
353363

354364
cause = None
355-
is_broken = True
365+
thread_wakeup.clear()
356366
if result_reader in ready:
357367
try:
358-
result_item = result_reader.recv()
359-
is_broken = False
368+
serialize_res = result_reader.recv_bytes()
369+
work_id = int.from_bytes(serialize_res[:WORK_ID_SIZE],
370+
WORK_ID_ENC)
371+
result_item = work_id
372+
if len(serialize_res) > WORK_ID_SIZE:
373+
result_item = _ForkingPickler.loads(
374+
serialize_res[WORK_ID_SIZE:])
360375
except BaseException as e:
361-
cause = traceback.format_exception(type(e), e, e.__traceback__)
362-
376+
result_item = _ResultItem(work_id, exception=e)
363377
elif wakeup_reader in ready:
364378
is_broken = False
365379
result_item = None
366-
thread_wakeup.clear()
367-
if is_broken:
380+
else:
368381
# Mark the process pool broken so that submits fail right now.
369382
executor = executor_reference()
370383
if executor is not None:
@@ -531,9 +544,7 @@ def __init__(self, max_workers=None, mp_context=None,
531544
# prevent the worker processes from idling. But don't make it too big
532545
# because futures in the call queue cannot be cancelled.
533546
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
534-
self._call_queue = _SafeQueue(
535-
max_size=queue_size, ctx=self._mp_context,
536-
pending_work_items=self._pending_work_items)
547+
self._call_queue = Queue(queue_size, ctx=self._mp_context)
537548
# Killed worker processes can produce spurious "broken pipe"
538549
# tracebacks in the queue's own worker thread. But we detect killed
539550
# processes anyway, so silence the tracebacks.

Lib/test/test_concurrent_futures.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import time
1919
import unittest
2020
import weakref
21-
from pickle import PicklingError
21+
from pickle import PicklingError, UnpicklingError
2222

2323
from concurrent import futures
2424
from concurrent.futures._base import (
@@ -888,11 +888,12 @@ def test_crash(self):
888888
crash_cases = [
889889
# Check problem occuring while pickling a task in
890890
# the task_handler thread
891-
(id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
891+
(id, (ErrorAtPickle(),), PicklingError,
892+
"error at task pickle"),
892893
# Check problem occuring while unpickling a task on workers
893-
(id, (ExitAtUnpickle(),), BrokenProcessPool,
894+
(id, (ExitAtUnpickle(),), SystemExit,
894895
"exit at task unpickle"),
895-
(id, (ErrorAtUnpickle(),), BrokenProcessPool,
896+
(id, (ErrorAtUnpickle(),), UnpicklingError,
896897
"error at task unpickle"),
897898
(id, (CrashAtUnpickle(),), BrokenProcessPool,
898899
"crash at task unpickle"),
@@ -913,9 +914,9 @@ def test_crash(self):
913914
"error during result pickle on worker"),
914915
# Check problem occuring while unpickling a task in
915916
# the result_handler thread
916-
(_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
917+
(_return_instance, (ErrorAtUnpickle,), UnpicklingError,
917918
"error during result unpickle in result_handler"),
918-
(_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
919+
(_return_instance, (ExitAtUnpickle,), SystemExit,
919920
"exit during result unpickle in result_handler")
920921
]
921922
for func, args, error, name in crash_cases:

0 commit comments

Comments
 (0)