Skip to main content

Lib/concurrent/futures/thread.py

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/thread.py

ThreadPoolExecutor manages a pool of worker threads that pull tasks from a queue.SimpleQueue. Each worker runs inside a WorkerContext, which handles optional per-thread initialization and finalization. Shutdown is cooperative: a None sentinel is placed in the queue and propagates through the chain of workers. The module registers an atexit-equivalent hook via threading._register_atexit to drain threads during interpreter teardown, and registers os.register_at_fork handlers so that the global shutdown lock and thread-queue mapping survive a fork() call safely.

Map

LinesSymbolRole
17-21_threads_queues, _shutdown, _global_shutdown_lockModule-level state: weak-key map from threads to queues, global shutdown flag, and the lock protecting both
23-37_python_exitCalled just before daemon threads are joined; sets the flag and drains all queues
40-44os.register_at_fork blockReinitializes shutdown lock and clears thread-queue map in child processes after fork
47-73WorkerContextEncapsulates initializer and finalizer lifecycle for a single worker thread
76-94_WorkItemPairs a Future with the task tuple; calls ctx.run() and routes result or exception
97-141_workerThread target function; runs the WorkerContext and loops over queue items until shutdown
144-147BrokenThreadPoolException raised when an initializer failure breaks the pool
150-274ThreadPoolExecutorPublic executor: submit, thread management, idle semaphore, shutdown

Reading

Global state and fork safety

Two module globals coordinate shutdown across threads and processes: _threads_queues (a WeakKeyDictionary from thread objects to their work queues) and _shutdown (a boolean). Both are protected by _global_shutdown_lock so that new thread creation and the shutdown sweep never race.

The os.register_at_fork block handles POSIX fork(). Before forking, the parent acquires the lock. After forking, the child reinitializes the lock (a new mutex, since the parent's state is gone) and clears _threads_queues (because the parent's threads do not exist in the child). The parent simply releases the lock.

# CPython: Lib/concurrent/futures/thread.py:40 os.register_at_fork block
if hasattr(os, 'register_at_fork'):
os.register_at_fork(before=_global_shutdown_lock.acquire,
after_in_child=_global_shutdown_lock._at_fork_reinit,
after_in_parent=_global_shutdown_lock.release)
os.register_at_fork(after_in_child=_threads_queues.clear)

WorkerContext and the initializer lifecycle

WorkerContext is the abstraction point for per-thread setup. prepare() is a classmethod that validates the initializer and returns a pair of callables: create_context (called once per new thread) and resolve_task (called once per submitted item). Subclasses can override prepare_context on ThreadPoolExecutor to inject different context types, which is the mechanism used by the multiprocessing integration in the standard library.

# CPython: Lib/concurrent/futures/thread.py:49 WorkerContext.prepare
@classmethod
def prepare(cls, initializer, initargs):
if initializer is not None:
if not callable(initializer):
raise TypeError("initializer must be a callable")
def create_context():
return cls(initializer, initargs)
def resolve_task(fn, args, kwargs):
return (fn, args, kwargs)
return create_context, resolve_task

The worker loop and idle semaphore

_worker runs as the thread target. After calling ctx.initialize(), it enters a tight loop. On each iteration it first tries a non-blocking get_nowait(). If the queue is empty it increments the idle semaphore before blocking on get(block=True). This semaphore is what _adjust_thread_count checks: if a release is available, an idle thread already exists and no new thread is spawned.

When a None is dequeued, the worker checks the three shutdown conditions (_shutdown, executor GC, executor._shutdown). If any is true it re-enqueues the sentinel so the next worker also wakes up, then returns.

# CPython: Lib/concurrent/futures/thread.py:97 _worker
def _worker(executor_reference, ctx, work_queue):
try:
ctx.initialize()
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
try:
work_item = work_queue.get_nowait()
except queue.Empty:
executor = executor_reference()
if executor is not None:
executor._idle_semaphore.release()
del executor
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run(ctx)
del work_item
continue
executor = executor_reference()
if _shutdown or executor is None or executor._shutdown:
if executor is not None:
executor._shutdown = True
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
finally:
ctx.finalize()

submit and thread creation

submit holds both _shutdown_lock and _global_shutdown_lock simultaneously to prevent a race between a new submission and an ongoing shutdown. It creates a Future, wraps it in a _WorkItem, enqueues it, then calls _adjust_thread_count.

_adjust_thread_count tries to decrement the idle semaphore first. Only when that fails (no idle thread available) and the thread count is below _max_workers does it spawn a new threading.Thread. The thread receives a weakref to the executor so that GC of the executor triggers a wakeup sentinel via the weakref callback.

# CPython: Lib/concurrent/futures/thread.py:219 _adjust_thread_count
def _adjust_thread_count(self):
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._create_worker_context(),
self._work_queue))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue

gopy notes

WorkerContext.prepare returns two closures that gopy would represent as function values. The idle semaphore pattern (threading.Semaphore) maps to a buffered channel or sync.Mutex combined with a counter in Go. The WeakKeyDictionary for _threads_queues has no direct Go equivalent; a sync.Map with explicit removal on thread exit is the closest analog. The double-lock acquisition in submit (_shutdown_lock then _global_shutdown_lock) requires care since Go's sync.Mutex is not reentrant.

CPython 3.14 changes

  • WorkerContext was introduced in 3.14 (extracted from former inline initializer handling) to support the multiprocessing executor context protocol.
  • max_workers default now uses os.process_cpu_count() instead of os.cpu_count().
  • ctxkwargs forwarding in __init__ and prepare_context is new in 3.14, enabling subclass context customization without overriding __init__.
  • os.register_at_fork(after_in_child=_threads_queues.clear) was added to fix stale entries in child processes after fork.