concurrent/futures/thread.py
Lib/concurrent/futures/thread.py implements ThreadPoolExecutor. Work items
are wrapped in _WorkItem objects, queued in a SimpleQueue, and consumed by
worker threads spawned lazily up to max_workers. The public surface is
submit(), map() (inherited from Executor), and shutdown().
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-40 | imports, _GLOBAL_SHUTDOWN_LOCK | Module-level sentinel and atexit hook |
| 41-80 | _WorkItem | Holds Future, callable, args; run() resolves the future |
| 81-130 | _worker | Thread body: drain queue, call work_item.run(), exit on None sentinel |
| 131-200 | ThreadPoolExecutor.__init__ | Sets max_workers, creates SimpleQueue, thread set |
| 201-270 | ThreadPoolExecutor.submit | Create Future, enqueue _WorkItem, call _adjust_thread_count |
| 271-320 | _adjust_thread_count | Spawn a new thread if under limit and queue is non-empty |
| 321-370 | ThreadPoolExecutor.shutdown | Send None sentinels, optionally join workers |
| 371-400 | _python_exit | atexit handler that drains on interpreter exit |
Reading
_WorkItem.run
run() is where user code executes. It resolves the Future with either a
result or an exception, then drops all references to avoid retention cycles.
# CPython: Lib/concurrent/futures/thread.py:58 _WorkItem.run
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
self = None
else:
self.future.set_result(result)
self = None
_worker loop
Each worker thread runs this loop until it pulls a None sentinel from the
queue. The sentinel is then re-enqueued so that other waiting workers also
terminate.
# CPython: Lib/concurrent/futures/thread.py:83 _worker
def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical("Exception in initializer:", exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
del work_item
continue
executor = executor_reference()
if executor is not None:
executor._work_queue.put(None)
del executor
return
except BaseException:
_base.LOGGER.critical("Exception in worker", exc_info=True)
submit and _adjust_thread_count
submit() creates a Future, wraps it in a _WorkItem, enqueues it, and
then calls _adjust_thread_count which spawns a new thread only when the
live-thread count is below max_workers.
# CPython: Lib/concurrent/futures/thread.py:210 ThreadPoolExecutor.submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)
if self._shutdown:
raise RuntimeError("cannot schedule new futures after shutdown")
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
# CPython: Lib/concurrent/futures/thread.py:278 _adjust_thread_count
def _adjust_thread_count(self):
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
gopy notes
Futureis inLib/concurrent/futures/_base.py; the gopy port keeps the same split into abasesub-package.SimpleQueuein the worker loop is already ported viaqueue.py.weakref.refon the executor is used to detect pool teardown without preventing GC; gopy uses async/atomicpointer and a finalizer._shutdown_lockbecomes async.Mutex;_brokenand_shutdownareatomic.Boolvalues.- Thread-count tracking uses
sync/atomicinstead oflen(set)under a lock.
CPython 3.14 changes
- 3.12 introduced
cancel_futuresparameter toshutdown(); it is present and unchanged in 3.14. - 3.13 added
_work_queue.shutdown()integration so thatput()raisesqueue.ShutdownErrorafter pool shutdown, replacing the sentinel loop for fast cancellation. The 3.14 file retains this. _GLOBAL_SHUTDOWN_LOCKwas added in 3.9 to fixatexitrace conditions; no changes in 3.14.