From b1133c868eff768128f588215bbcfb4652b86cd2 Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Sun, 18 Mar 2018 16:09:00 +0000 Subject: [PATCH 1/8] bpo-33097: Fix submit accepting callable after executor shutdown by interpreter exit. (GH-6144) Executors in concurrent.futures accepted tasks after executor was shutdown by interpreter exit. Tasks were left in PENDING state forever. This fix changes submit to instead raise a RuntimeError. --- Lib/concurrent/futures/process.py | 2 +- Lib/concurrent/futures/thread.py | 2 +- Lib/test/test_concurrent_futures.py | 27 +++++++++++++++++++ .../2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst | 2 ++ 4 files changed, 31 insertions(+), 2 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index aaa5151e017c0f7..0ac0595b549a72a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -591,7 +591,7 @@ def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenProcessPool(self._broken) - if self._shutdown_thread: + if _global_shutdown or self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 6e22950a157db6c..2d0374082158e02 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -143,7 +143,7 @@ def submit(self, fn, *args, **kwargs): if self._broken: raise BrokenThreadPool(self._broken) - if self._shutdown: + if _shutdown or self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') f = _base.Future() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 18d0265f3f61a63..09928e7acbad3fb 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -303,6 +303,33 @@ def test_interpreter_shutdown(self): self.assertFalse(err) self.assertEqual(out.strip(), b"apple") + def test_submit_after_interpreter_shutdown(self): + # Test the atexit hook for shutdown of worker threads and processes + rc, out, err = assert_python_ok('-c', """if 1: + import atexit + @atexit.register + def run_last(): + try: + t.submit(lambda: None) + except RuntimeError: + print("runtime-error") + raise + from concurrent.futures import {executor_type} + if __name__ == "__main__": + context = '{context}' + if context == "": + t = {executor_type}(5) + else: + from multiprocessing import get_context + context = get_context(context) + t = {executor_type}(5, mp_context=context) + """.format(executor_type=self.executor_type.__name__, + context=getattr(self, "ctx", ""))) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertTrue(err) + self.assertEqual(out.strip(), b"runtime-error") + def test_hang_issue12364(self): fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] self.executor.shutdown() diff --git a/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst b/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst new file mode 100644 index 000000000000000..d9411eb51623531 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2018-03-18-16-48-23.bpo-33097.Yl4gI2.rst @@ -0,0 +1,2 @@ +Raise RuntimeError when ``executor.submit`` is called during interpreter +shutdown. From 10fb3c057c6f26b86845c6350638850cb02f6147 Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Sat, 24 Mar 2018 13:56:49 +0000 Subject: [PATCH 2/8] Replace submitted lambda: None with id, None --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 09928e7acbad3fb..ba42fc2a32fd751 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -310,7 +310,7 @@ def test_submit_after_interpreter_shutdown(self): @atexit.register def run_last(): try: - t.submit(lambda: None) + t.submit(id, None) except RuntimeError: print("runtime-error") raise From bc31b411af112e1c9e67abb2c64d43f9b968cedd Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Sat, 24 Mar 2018 14:07:04 +0000 Subject: [PATCH 3/8] Add explicit RuntimeError for when interpreter is shutting down --- Lib/concurrent/futures/process.py | 5 ++++- Lib/concurrent/futures/thread.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 0ac0595b549a72a..65e07a1780a67e1 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -591,8 +591,11 @@ def submit(self, fn, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenProcessPool(self._broken) - if _global_shutdown or self._shutdown_thread: + if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') + if _global_shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2d0374082158e02..1249349390b48d7 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -143,8 +143,11 @@ def submit(self, fn, *args, **kwargs): if self._broken: raise BrokenThreadPool(self._broken) - if _shutdown or self._shutdown: + if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') + if _shutdown: + raise RuntimeError('cannot schedule new futures after' + 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) From 47e72fc0073aaf901c6a310df7e06d2c39363a1d Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Sat, 24 Mar 2018 14:13:49 +0000 Subject: [PATCH 4/8] Add explicit submit and result to submit after interpreter shutdown test --- Lib/test/test_concurrent_futures.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index ba42fc2a32fd751..eeafaf68bcd366e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -323,6 +323,7 @@ def run_last(): from multiprocessing import get_context context = get_context(context) t = {executor_type}(5, mp_context=context) + t.submit(id, 42).result() """.format(executor_type=self.executor_type.__name__, context=getattr(self, "ctx", ""))) # Errors in atexit hooks don't change the process exit code, check From 546ec80ee620cd4190d6f4b66038164d593eb187 Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Sun, 25 Mar 2018 16:41:17 +0100 Subject: [PATCH 5/8] Mark ProcessPoolExecutor shutdown from queue management thread if shutting down --- Lib/concurrent/futures/process.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 65e07a1780a67e1..00627e112717bfb 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -421,6 +421,10 @@ def shutdown_worker(): # - The executor that owns this worker has been shutdown. if shutting_down(): try: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown_thread = True # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: From a1853a66199b0710bb29b9ea55b1d13f809d8f5c Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Sun, 25 Mar 2018 16:41:46 +0100 Subject: [PATCH 6/8] Mark ThreadPoolExecutor shutdown from worker thread if shutting down --- Lib/concurrent/futures/thread.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 1249349390b48d7..b65dee11f727279 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -87,6 +87,10 @@ def _worker(executor_reference, work_queue, initializer, initargs): # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown = True # Notice other workers work_queue.put(None) return From 6b78fc27824ea0cc493d9195b1987b7698100e5d Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Sun, 8 Apr 2018 17:23:46 +0100 Subject: [PATCH 7/8] Replace empty string check with implicit bool check in test_submit_after_interpreter_shutdown --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index eeafaf68bcd366e..77b39811cd2c405 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -317,7 +317,7 @@ def run_last(): from concurrent.futures import {executor_type} if __name__ == "__main__": context = '{context}' - if context == "": + if not context: t = {executor_type}(5) else: from multiprocessing import get_context From 2ba637867e44f8dff0248bc156f95b84d75d2804 Mon Sep 17 00:00:00 2001 From: Mark Nemec Date: Mon, 9 Apr 2018 23:01:46 +0100 Subject: [PATCH 8/8] Replace assertTrue with assertIn in test_submit_after_interpreter_shutdown --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 77b39811cd2c405..b258a0eafde6d44 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -328,7 +328,7 @@ def run_last(): context=getattr(self, "ctx", ""))) # Errors in atexit hooks don't change the process exit code, check # stderr manually. - self.assertTrue(err) + self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) self.assertEqual(out.strip(), b"runtime-error") def test_hang_issue12364(self):