Skip to main content

Lib/concurrent/futures/thread.py

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/thread.py

The file implements ThreadPoolExecutor, the threading backend for concurrent.futures. Submitted callables are wrapped in _WorkItem objects, placed on a SimpleQueue, and consumed by daemon threading.Thread workers. An idle semaphore avoids creating new threads when existing ones are available. Shutdown drains or cancels pending work before joining all threads.

Map

LinesSymbolRole
24-40_python_exitatexit handler: drains all live thread pools at interpreter exit
96-114_WorkItemWraps (future, task); run() settles the future via context execution
145-150BrokenThreadPoolRaised when a thread initializer fails
151-188ThreadPoolExecutor.__init__Allocates queue, semaphore, thread set; computes default max_workers
189-206ThreadPoolExecutor.submitWraps callable, enqueues _WorkItem, calls _adjust_thread_count
207-228_adjust_thread_countSpawns a new daemon thread if the pool has room and no idle worker exists
229-241_initializer_failedMarks the pool broken and cancels all queued futures
242-260shutdownSets _shutdown, optionally drains and cancels queue, sends None sentinel, joins threads

Reading

_WorkItem and context execution

_WorkItem is the unit of work on the queue. In CPython 3.14 the run() method receives a ctx argument (a contextvars.Context) and invokes the task callable through ctx.run() so that context variables are isolated per work item. The self = None assignment in the exception branch breaks a reference cycle between the exception traceback and the future.

# CPython: Lib/concurrent/futures/thread.py:96 _WorkItem
class _WorkItem:
def __init__(self, future, task):
self.future = future
self.task = task

def run(self, ctx):
if not self.future.set_running_or_notify_cancel():
return
try:
result = ctx.run(self.task)
except BaseException as exc:
self.future.set_exception(exc)
self = None # break reference cycle with exc
else:
self.future.set_result(result)

ThreadPoolExecutor.init and default max_workers

The default pool size is min(32, (os.process_cpu_count() or 1) + 4). The cap at 32 prevents runaway thread creation on machines with many cores. The +4 term adds headroom for I/O-bound workloads that block most of the time.

# CPython: Lib/concurrent/futures/thread.py:151 ThreadPoolExecutor.__init__
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=(), **ctxkwargs):
if max_workers is None:
max_workers = min(32, (os.process_cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")

self._max_workers = max_workers
self._work_queue = queue.SimpleQueue()
self._idle_semaphore = threading.Semaphore(0)
self._threads = set()
self._broken = False
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))

_adjust_thread_count: idle semaphore and weakref callback

Before every submit(), _adjust_thread_count checks the idle semaphore with a zero timeout. If an idle thread is available it returns immediately without creating a new one. Otherwise it spawns a new daemon thread up to _max_workers. The executor is passed to the thread closure as a weakref so that a live thread does not prevent the executor from being garbage-collected. The weakref callback puts a None onto the queue to wake the worker when the executor is collected.

# CPython: Lib/concurrent/futures/thread.py:207 _adjust_thread_count
def _adjust_thread_count(self):
if self._idle_semaphore.acquire(timeout=0):
return

def weakref_cb(_, q=self._work_queue):
q.put(None)

num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._create_worker_context(),
self._work_queue))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue

shutdown: drain, cancel, and sentinel

shutdown(wait=True) first optionally drains the queue and cancels pending futures when cancel_futures=True. It then puts a single None sentinel onto the queue (each worker that picks up None re-enqueues it for the next worker before exiting, so one sentinel cascades through all threads). Finally it joins every thread.

# CPython: Lib/concurrent/futures/thread.py:242 shutdown
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Wake up threads blocked on get(block=True).
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()

gopy notes

  • Goroutines replace threading.Thread workers. A buffered channel takes the role of the SimpleQueue.
  • The idle semaphore maps to a chan struct{} of capacity max_workers. A worker sends to the channel before blocking on the work channel and receives from it when it picks up a work item, mirroring acquire / implied release.
  • _WorkItem.run(ctx) with a contextvars.Context requires gopy to carry a context snapshot per submission, analogous to how goroutines inherit context.Context values.
  • The weakref callback that puts None on the queue when the executor is GC'd has no direct Go equivalent. gopy can use a finalizer on the executor struct to close the work channel instead.
  • _threads is a plain set used only for join on shutdown. A sync.WaitGroup is the idiomatic Go replacement.
  • _global_shutdown_lock (line ~20) prevents a race between the atexit drain and an explicit shutdown() call. gopy serialises this with a sync.Once.