Skip to main content

concurrent/futures/thread.py

Lib/concurrent/futures/thread.py implements ThreadPoolExecutor. Work items are wrapped in _WorkItem objects, queued in a SimpleQueue, and consumed by worker threads spawned lazily up to max_workers. The public surface is submit(), map() (inherited from Executor), and shutdown().

Map

LinesSymbolRole
1-40imports, _GLOBAL_SHUTDOWN_LOCKModule-level sentinel and atexit hook
41-80_WorkItemHolds Future, callable, args; run() resolves the future
81-130_workerThread body: drain queue, call work_item.run(), exit on None sentinel
131-200ThreadPoolExecutor.__init__Sets max_workers, creates SimpleQueue, thread set
201-270ThreadPoolExecutor.submitCreate Future, enqueue _WorkItem, call _adjust_thread_count
271-320_adjust_thread_countSpawn a new thread if under limit and queue is non-empty
321-370ThreadPoolExecutor.shutdownSend None sentinels, optionally join workers
371-400_python_exitatexit handler that drains on interpreter exit

Reading

_WorkItem.run

run() is where user code executes. It resolves the Future with either a result or an exception, then drops all references to avoid retention cycles.

# CPython: Lib/concurrent/futures/thread.py:58 _WorkItem.run
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
self = None
else:
self.future.set_result(result)
self = None

_worker loop

Each worker thread runs this loop until it pulls a None sentinel from the queue. The sentinel is then re-enqueued so that other waiting workers also terminate.

# CPython: Lib/concurrent/futures/thread.py:83 _worker
def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical("Exception in initializer:", exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
del work_item
continue
executor = executor_reference()
if executor is not None:
executor._work_queue.put(None)
del executor
return
except BaseException:
_base.LOGGER.critical("Exception in worker", exc_info=True)

submit and _adjust_thread_count

submit() creates a Future, wraps it in a _WorkItem, enqueues it, and then calls _adjust_thread_count which spawns a new thread only when the live-thread count is below max_workers.

# CPython: Lib/concurrent/futures/thread.py:210 ThreadPoolExecutor.submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenThreadPool(self._broken)
if self._shutdown:
raise RuntimeError("cannot schedule new futures after shutdown")
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
# CPython: Lib/concurrent/futures/thread.py:278 _adjust_thread_count
def _adjust_thread_count(self):
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue

gopy notes

  • Future is in Lib/concurrent/futures/_base.py; the gopy port keeps the same split into a base sub-package.
  • SimpleQueue in the worker loop is already ported via queue.py.
  • weakref.ref on the executor is used to detect pool teardown without preventing GC; gopy uses a sync/atomic pointer and a finalizer.
  • _shutdown_lock becomes a sync.Mutex; _broken and _shutdown are atomic.Bool values.
  • Thread-count tracking uses sync/atomic instead of len(set) under a lock.

CPython 3.14 changes

  • 3.12 introduced cancel_futures parameter to shutdown(); it is present and unchanged in 3.14.
  • 3.13 added _work_queue.shutdown() integration so that put() raises queue.ShutdownError after pool shutdown, replacing the sentinel loop for fast cancellation. The 3.14 file retains this.
  • _GLOBAL_SHUTDOWN_LOCK was added in 3.9 to fix atexit race conditions; no changes in 3.14.