Lib/concurrent/futures/thread.py
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/thread.py
The file implements ThreadPoolExecutor, the threading backend for
concurrent.futures. Submitted callables are wrapped in _WorkItem objects,
placed on a SimpleQueue, and consumed by daemon threading.Thread workers.
An idle semaphore avoids creating new threads when existing ones are available.
Shutdown drains or cancels pending work before joining all threads.
Map
| Lines | Symbol | Role |
|---|---|---|
| 24-40 | _python_exit | atexit handler: drains all live thread pools at interpreter exit |
| 96-114 | _WorkItem | Wraps (future, task); run() settles the future via context execution |
| 145-150 | BrokenThreadPool | Raised when a thread initializer fails |
| 151-188 | ThreadPoolExecutor.__init__ | Allocates queue, semaphore, thread set; computes default max_workers |
| 189-206 | ThreadPoolExecutor.submit | Wraps callable, enqueues _WorkItem, calls _adjust_thread_count |
| 207-228 | _adjust_thread_count | Spawns a new daemon thread if the pool has room and no idle worker exists |
| 229-241 | _initializer_failed | Marks the pool broken and cancels all queued futures |
| 242-260 | shutdown | Sets _shutdown, optionally drains and cancels queue, sends None sentinel, joins threads |
Reading
_WorkItem and context execution
_WorkItem is the unit of work on the queue. In CPython 3.14 the run() method
receives a ctx argument (a contextvars.Context) and invokes the task callable
through ctx.run() so that context variables are isolated per work item. The
self = None assignment in the exception branch breaks a reference cycle between
the exception traceback and the future.
# CPython: Lib/concurrent/futures/thread.py:96 _WorkItem
class _WorkItem:
def __init__(self, future, task):
self.future = future
self.task = task
def run(self, ctx):
if not self.future.set_running_or_notify_cancel():
return
try:
result = ctx.run(self.task)
except BaseException as exc:
self.future.set_exception(exc)
self = None # break reference cycle with exc
else:
self.future.set_result(result)
ThreadPoolExecutor.init and default max_workers
The default pool size is min(32, (os.process_cpu_count() or 1) + 4). The cap
at 32 prevents runaway thread creation on machines with many cores. The +4
term adds headroom for I/O-bound workloads that block most of the time.
# CPython: Lib/concurrent/futures/thread.py:151 ThreadPoolExecutor.__init__
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=(), **ctxkwargs):
if max_workers is None:
max_workers = min(32, (os.process_cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))
_adjust_thread_count: idle semaphore and weakref callback
Before every submit(), _adjust_thread_count checks the idle semaphore with a
zero timeout. If an idle thread is available it returns immediately without
creating a new one. Otherwise it spawns a new daemon thread up to _max_workers.
The executor is passed to the thread closure as a weakref so that a live thread
does not prevent the executor from being garbage-collected. The weakref callback
puts a None onto the queue to wake the worker when the executor is collected.
# CPython: Lib/concurrent/futures/thread.py:207 _adjust_thread_count
def _adjust_thread_count(self):
if self._idle_semaphore.acquire(timeout=0):
return
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._create_worker_context(),
self._work_queue))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
shutdown: drain, cancel, and sentinel
shutdown(wait=True) first optionally drains the queue and cancels pending
futures when cancel_futures=True. It then puts a single None sentinel onto
the queue (each worker that picks up None re-enqueues it for the next worker
before exiting, so one sentinel cascades through all threads). Finally it joins
every thread.
# CPython: Lib/concurrent/futures/thread.py:242 shutdown
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Wake up threads blocked on get(block=True).
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
gopy notes
- Goroutines replace
threading.Threadworkers. A buffered channel takes the role of theSimpleQueue. - The idle semaphore maps to a
chan struct{}of capacitymax_workers. A worker sends to the channel before blocking on the work channel and receives from it when it picks up a work item, mirroringacquire/ implied release. _WorkItem.run(ctx)with acontextvars.Contextrequires gopy to carry a context snapshot per submission, analogous to how goroutines inheritcontext.Contextvalues.- The weakref callback that puts
Noneon the queue when the executor is GC'd has no direct Go equivalent. gopy can use a finalizer on the executor struct to close the work channel instead. _threadsis a plainsetused only forjoinon shutdown. Async.WaitGroupis the idiomatic Go replacement._global_shutdown_lock(line ~20) prevents a race between theatexitdrain and an explicitshutdown()call. gopy serialises this with async.Once.