Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Unreleased
==========

- The ThreadPoolExecutor class constructor now accepts an optional ``thread_name_prefix``
argument to make it possible to customize the names of the threads created by the pool.
Upstream contribution by Gregory P. Smith in https://bugs.python.org/issue27664.

3.1.1
=====

Expand Down
17 changes: 14 additions & 3 deletions concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import atexit
from concurrent.futures import _base
import itertools
import Queue as queue
import threading
import weakref
Expand Down Expand Up @@ -90,12 +91,17 @@ def _worker(executor_reference, work_queue):


class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):

# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().next

def __init__(self, max_workers=None, thread_name_prefix=''):
"""Initializes a new ThreadPoolExecutor instance.

Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
Expand All @@ -109,6 +115,8 @@ def __init__(self, max_workers=None):
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))

def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
Expand All @@ -130,8 +138,11 @@ def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
Expand Down
29 changes: 26 additions & 3 deletions test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def reap_threads(func):
If threading is unavailable this function does nothing.
"""
@functools.wraps(func)
def decorator(*args):
def decorator(*args):
key = test_support.threading_setup()
try:
return func(*args)
Expand All @@ -50,7 +50,7 @@ def _assert_python(expected_success, *args, **env_vars):
# caller is responsible to pass the full environment.
if env_vars.pop('__cleanenv', None):
env = {}
env.update(env_vars)
env.update(env_vars)
cmd_line.extend(args)
p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
Expand Down Expand Up @@ -78,7 +78,7 @@ def assert_python_ok(*args, **env_vars):
return _assert_python(True, *args, **env_vars)


def strip_python_stderr(stderr):
def strip_python_stderr(stderr):
"""Strip the stderr of a Python process from potential debug output
emitted by the interpreter.

Expand Down Expand Up @@ -230,6 +230,29 @@ def test_del_shutdown(self):
for t in threads:
t.join()

def test_thread_names_assigned(self):
executor = futures.ThreadPoolExecutor(
max_workers=5, thread_name_prefix='SpecialPool')
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor

for t in threads:
self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$')
t.join()

def test_thread_names_default(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor

for t in threads:
# Ensure that our default name is reasonably sane and unique when
# no thread_name_prefix was supplied.
self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
t.join()


class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
def _prime_executor(self):
Expand Down