Skip to main content

Lib/concurrent/futures/__init__.py

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/__init__.py

concurrent.futures is a package under Lib/concurrent/futures/. The public API lives across four files:

  • _base.py -- Future, Executor, as_completed, wait, the FIRST_COMPLETED/FIRST_EXCEPTION/ALL_COMPLETED constants, and internal _Waiter classes.
  • thread.py -- ThreadPoolExecutor and _WorkItem.
  • process.py -- ProcessPoolExecutor, _CallItem, _ResultItem, and the worker process loop.
  • __init__.py -- re-exports everything from the above.

The design deliberately mirrors java.util.concurrent and asyncio.Future in structure. concurrent.futures.Future is a distinct class from asyncio.Future; asyncio.wrap_future bridges the two.

Map

LinesSymbolRolegopy
_base.py 1-300Future, _PENDING, _RUNNING, _CANCELLED, _CANCELLED_AND_NOTIFIED, _FINISHEDFive-state result container; result() blocks until state reaches _FINISHED; add_done_callback fires immediately if already finished.(stdlib pending)
_base.py 300-500Executor, Executor.map, Executor.shutdownAbstract base; map wraps submit in a lazy iterator that calls result() in submission order; shutdown flushes the pool.(stdlib pending)
_base.py 500-700wait, as_completed, _Waiter, _FirstCompletedWaiter, _AllCompletedWaiterwait blocks until one of the three completion conditions is met; as_completed yields futures as they finish using a _FirstCompletedWaiter.(stdlib pending)
thread.py 1-300_WorkItem, _worker, BrokenThreadPool, ThreadPoolExecutor_WorkItem holds the callable and its Future; _worker is the function run in each pool thread; ThreadPoolExecutor.submit enqueues a _WorkItem and spawns threads up to max_workers.(stdlib pending)
process.py 1-400_CallItem, _ResultItem, _SafeQueue, _process_worker, ProcessPoolExecutor_process_worker runs in each worker process, reading _CallItem objects from a multiprocessing.Queue and writing _ResultItem objects back.(stdlib pending)

Reading

Future state machine (_base.py lines 1 to 300)

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/_base.py#L1-300

class Future:
def __init__(self):
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._done_callbacks = []

def result(self, timeout=None):
try:
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
finally:
# Break a reference cycle with the exception in self._exception
self = None

def set_result(self, result):
with self._condition:
if self._state != RUNNING:
raise InvalidStateError(...)
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()

concurrent.futures.Future is thread-safe: all state transitions happen under self._condition, a threading.Condition. result() calls self._condition.wait(timeout) to block the calling thread. When set_result fires it calls notify_all(), waking all blocked result() calls. The same _condition serializes cancel(), set_exception(), and set_result().

The five states form a linear progression with one branch:

PENDING -> RUNNING -> FINISHED
\-> CANCELLED -> CANCELLED_AND_NOTIFIED

cancel() succeeds only from PENDING or RUNNING (before the executor has started the work item). Once in RUNNING the executor checks Future.cancelled() before invoking the callable and transitions to CANCELLED_AND_NOTIFIED if a cancellation was requested.

_invoke_callbacks is called outside the condition lock to avoid deadlocks when a callback acquires a lock of its own. Done callbacks receive the Future object as their only argument.

ThreadPoolExecutor worker loop (thread.py lines 1 to 300)

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/thread.py#L1-300

def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
del work_item
executor = executor_reference()
if executor is not None:
executor._work_queue.task_done()
del executor
continue
executor = executor_reference()
if executor is None or executor._shutdown:
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)

Each pool thread runs _worker. The sentinel None item signals shutdown: when a thread receives None it puts None back on the queue (so the next thread also wakes up and exits) and returns. This is a broadcast-by-relay pattern that avoids needing one sentinel per thread.

_WorkItem.run calls the user callable and sets the result or exception on the Future:

class _WorkItem:
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
self = None
else:
self.future.set_result(result)
self = None

set_running_or_notify_cancel atomically transitions the future to RUNNING or, if it was already cancelled, calls _invoke_callbacks and returns False. Setting self = None at the end of both branches breaks the reference cycle between _WorkItem, the callable, and the Future.

as_completed iterator (_base.py lines 500 to 700)

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/_base.py#L500-700

def as_completed(fs, timeout=None):
if timeout is not None:
end_time = timeout + time.monotonic()

fs = set(fs)
done = set()
not_done = set()
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
yield from done
for future in done:
fs.discard(future)
while fs:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.monotonic()
if wait_timeout < 0:
raise TimeoutError(...)
with waiter.lock:
finished_futures = list(waiter.finished_futures)
waiter.finished_futures = []
waiter.event.clear()
for future in finished_futures:
yield future
fs.discard(future)
if fs:
waiter.event.wait(timeout=wait_timeout)
finally:
for f in fs:
with f._condition:
f._waiters.remove(waiter)

as_completed installs a _FirstCompletedWaiter on every future before entering the yield loop. The waiter's add_result / add_exception / add_cancelled callbacks append the finished future to waiter.finished_futures and set waiter.event. The generator wakes up, drains finished_futures, yields each one, and then calls waiter.event.wait again until all futures have been yielded.

The finally block removes the waiter from every still-pending future to avoid a reference leak when the caller abandons the iterator early with break.

gopy mirror

concurrent.futures is pending. ThreadPoolExecutor maps naturally to goroutines with a bounded worker pool and a chan work queue. Future's threading.Condition-based blocking must be replaced with Go channel semantics. ProcessPoolExecutor requires os/exec process management and is lower priority than the thread-based executor.