From 4cbaaffaa411923144605ec0270a8b74098b2dc2 Mon Sep 17 00:00:00 2001 From: Steven Seguin Date: Wed, 26 Jul 2017 11:19:56 -0400 Subject: [PATCH 1/2] Backport `thread_name_prefix` from upstream Add optional `thread_name_prefix` argument to constructor of ThreadPoolExecutor. Resolves https://github.com/agronholm/pythonfutures/issues/63 --- CHANGES | 7 +++++++ concurrent/futures/thread.py | 11 ++++++++--- test_futures.py | 30 +++++++++++++++++++++++++++--- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/CHANGES b/CHANGES index 4ce2585..f7d411c 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,10 @@ +Unreleased +========== + +- The ThreadPoolExecutor class constructor now accepts an optional ``thread_name_prefix`` + argument to make it possible to customize the names of the threads created by the pool. + Upstream contribution by Gregory P. Smith in https://bugs.python.org/issue27664. + 3.1.1 ===== diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py index efae619..445bf26 100644 --- a/concurrent/futures/thread.py +++ b/concurrent/futures/thread.py @@ -90,12 +90,13 @@ def _worker(executor_reference, work_queue): class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, thread_name_prefix=''): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. """ if max_workers is None: # Use this number because ThreadPoolExecutor is often @@ -109,6 +110,7 @@ def __init__(self, max_workers=None): self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() + self._thread_name_prefix = thread_name_prefix def submit(self, fn, *args, **kwargs): with self._shutdown_lock: @@ -130,8 +132,11 @@ def weakref_cb(_, q=self._work_queue): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = '%s_%d' % (self._thread_name_prefix or self, + num_threads) + t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True diff --git a/test_futures.py b/test_futures.py index e7cd8cf..da784d4 100644 --- a/test_futures.py +++ b/test_futures.py @@ -29,7 +29,7 @@ def reap_threads(func): If threading is unavailable this function does nothing. """ @functools.wraps(func) - def decorator(*args): + def decorator(*args): key = test_support.threading_setup() try: return func(*args) @@ -50,7 +50,7 @@ def _assert_python(expected_success, *args, **env_vars): # caller is responsible to pass the full environment. if env_vars.pop('__cleanenv', None): env = {} - env.update(env_vars) + env.update(env_vars) cmd_line.extend(args) p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -78,7 +78,7 @@ def assert_python_ok(*args, **env_vars): return _assert_python(True, *args, **env_vars) -def strip_python_stderr(stderr): +def strip_python_stderr(stderr): """Strip the stderr of a Python process from potential debug output emitted by the interpreter. @@ -230,6 +230,30 @@ def test_del_shutdown(self): for t in threads: t.join() + def test_thread_names_assigned(self): + executor = futures.ThreadPoolExecutor( + max_workers=5, thread_name_prefix='SpecialPool') + executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + + for t in threads: + self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$') + t.join() + + def test_thread_names_default(self): + executor = futures.ThreadPoolExecutor(max_workers=5) + executor.map(abs, range(-5, 5)) + threads = executor._threads + del executor + + for t in threads: + # We don't particularly care what the default name is, just that + # it has a default name implying that it is a ThreadPoolExecutor + # followed by what looks like a thread number. + self.assertRegexpMatches(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$') + t.join() + class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): def _prime_executor(self): From d16bb53addad420d00a585fde4612d117e413bfa Mon Sep 17 00:00:00 2001 From: Steven Seguin Date: Tue, 8 Aug 2017 09:43:53 -0400 Subject: [PATCH 2/2] Backport `ThreadPoolExecutor` thread name from upstream Changes from https://github.com/python/cpython/pull/2315 --- concurrent/futures/thread.py | 8 +++++++- test_futures.py | 7 +++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/concurrent/futures/thread.py b/concurrent/futures/thread.py index 445bf26..bb0ce9d 100644 --- a/concurrent/futures/thread.py +++ b/concurrent/futures/thread.py @@ -5,6 +5,7 @@ import atexit from concurrent.futures import _base +import itertools import Queue as queue import threading import weakref @@ -90,6 +91,10 @@ def _worker(executor_reference, work_queue): class ThreadPoolExecutor(_base.Executor): + + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().next + def __init__(self, max_workers=None, thread_name_prefix=''): """Initializes a new ThreadPoolExecutor instance. @@ -110,7 +115,8 @@ def __init__(self, max_workers=None, thread_name_prefix=''): self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() - self._thread_name_prefix = thread_name_prefix + self._thread_name_prefix = (thread_name_prefix or + ("ThreadPoolExecutor-%d" % self._counter())) def submit(self, fn, *args, **kwargs): with self._shutdown_lock: diff --git a/test_futures.py b/test_futures.py index da784d4..95a3ca2 100644 --- a/test_futures.py +++ b/test_futures.py @@ -248,10 +248,9 @@ def test_thread_names_default(self): del executor for t in threads: - # We don't particularly care what the default name is, just that - # it has a default name implying that it is a ThreadPoolExecutor - # followed by what looks like a thread number. - self.assertRegexpMatches(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$') + # Ensure that our default name is reasonably sane and unique when + # no thread_name_prefix was supplied. + self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') t.join()