Lib/concurrent/futures/thread.py
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/thread.py
ThreadPoolExecutor manages a pool of worker threads that pull tasks from a queue.SimpleQueue. Each worker runs inside a WorkerContext, which handles optional per-thread initialization and finalization. Shutdown is cooperative: a None sentinel is placed in the queue and propagates through the chain of workers. The module registers an atexit-equivalent hook via threading._register_atexit to drain threads during interpreter teardown, and registers os.register_at_fork handlers so that the global shutdown lock and thread-queue mapping survive a fork() call safely.
Map
| Lines | Symbol | Role |
|---|---|---|
| 17-21 | _threads_queues, _shutdown, _global_shutdown_lock | Module-level state: weak-key map from threads to queues, global shutdown flag, and the lock protecting both |
| 23-37 | _python_exit | Called just before daemon threads are joined; sets the flag and drains all queues |
| 40-44 | os.register_at_fork block | Reinitializes shutdown lock and clears thread-queue map in child processes after fork |
| 47-73 | WorkerContext | Encapsulates initializer and finalizer lifecycle for a single worker thread |
| 76-94 | _WorkItem | Pairs a Future with the task tuple; calls ctx.run() and routes result or exception |
| 97-141 | _worker | Thread target function; runs the WorkerContext and loops over queue items until shutdown |
| 144-147 | BrokenThreadPool | Exception raised when an initializer failure breaks the pool |
| 150-274 | ThreadPoolExecutor | Public executor: submit, thread management, idle semaphore, shutdown |
Reading
Global state and fork safety
Two module globals coordinate shutdown across threads and processes: _threads_queues (a WeakKeyDictionary from thread objects to their work queues) and _shutdown (a boolean). Both are protected by _global_shutdown_lock so that new thread creation and the shutdown sweep never race.
The os.register_at_fork block handles POSIX fork(). Before forking, the parent acquires the lock. After forking, the child reinitializes the lock (a new mutex, since the parent's state is gone) and clears _threads_queues (because the parent's threads do not exist in the child). The parent simply releases the lock.
# CPython: Lib/concurrent/futures/thread.py:40 os.register_at_fork block
if hasattr(os, 'register_at_fork'):
os.register_at_fork(before=_global_shutdown_lock.acquire,
after_in_child=_global_shutdown_lock._at_fork_reinit,
after_in_parent=_global_shutdown_lock.release)
os.register_at_fork(after_in_child=_threads_queues.clear)
WorkerContext and the initializer lifecycle
WorkerContext is the abstraction point for per-thread setup. prepare() is a classmethod that validates the initializer and returns a pair of callables: create_context (called once per new thread) and resolve_task (called once per submitted item). Subclasses can override prepare_context on ThreadPoolExecutor to inject different context types, which is the mechanism used by the multiprocessing integration in the standard library.
# CPython: Lib/concurrent/futures/thread.py:49 WorkerContext.prepare
@classmethod
def prepare(cls, initializer, initargs):
if initializer is not None:
if not callable(initializer):
raise TypeError("initializer must be a callable")
def create_context():
return cls(initializer, initargs)
def resolve_task(fn, args, kwargs):
return (fn, args, kwargs)
return create_context, resolve_task
The worker loop and idle semaphore
_worker runs as the thread target. After calling ctx.initialize(), it enters a tight loop. On each iteration it first tries a non-blocking get_nowait(). If the queue is empty it increments the idle semaphore before blocking on get(block=True). This semaphore is what _adjust_thread_count checks: if a release is available, an idle thread already exists and no new thread is spawned.
When a None is dequeued, the worker checks the three shutdown conditions (_shutdown, executor GC, executor._shutdown). If any is true it re-enqueues the sentinel so the next worker also wakes up, then returns.
# CPython: Lib/concurrent/futures/thread.py:97 _worker
def _worker(executor_reference, ctx, work_queue):
try:
ctx.initialize()
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
try:
work_item = work_queue.get_nowait()
except queue.Empty:
executor = executor_reference()
if executor is not None:
executor._idle_semaphore.release()
del executor
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run(ctx)
del work_item
continue
executor = executor_reference()
if _shutdown or executor is None or executor._shutdown:
if executor is not None:
executor._shutdown = True
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
finally:
ctx.finalize()
submit and thread creation
submit holds both _shutdown_lock and _global_shutdown_lock simultaneously to prevent a race between a new submission and an ongoing shutdown. It creates a Future, wraps it in a _WorkItem, enqueues it, then calls _adjust_thread_count.
_adjust_thread_count tries to decrement the idle semaphore first. Only when that fails (no idle thread available) and the thread count is below _max_workers does it spawn a new threading.Thread. The thread receives a weakref to the executor so that GC of the executor triggers a wakeup sentinel via the weakref callback.
# CPython: Lib/concurrent/futures/thread.py:219 _adjust_thread_count
def _adjust_thread_count(self):
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._create_worker_context(),
self._work_queue))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
gopy notes
WorkerContext.prepare returns two closures that gopy would represent as function values. The idle semaphore pattern (threading.Semaphore) maps to a buffered channel or sync.Mutex combined with a counter in Go. The WeakKeyDictionary for _threads_queues has no direct Go equivalent; a sync.Map with explicit removal on thread exit is the closest analog. The double-lock acquisition in submit (_shutdown_lock then _global_shutdown_lock) requires care since Go's sync.Mutex is not reentrant.
CPython 3.14 changes
WorkerContextwas introduced in 3.14 (extracted from former inline initializer handling) to support themultiprocessingexecutor context protocol.max_workersdefault now usesos.process_cpu_count()instead ofos.cpu_count().ctxkwargsforwarding in__init__andprepare_contextis new in 3.14, enabling subclass context customization without overriding__init__.os.register_at_fork(after_in_child=_threads_queues.clear)was added to fix stale entries in child processes after fork.