Skip to main content

Lib/concurrent/futures/__init__.py

Source:

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/__init__.py

The public concurrent.futures namespace is assembled here by importing from three private submodules:

  • _baseFuture, Executor, wait, as_completed, FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED, CancelledError, TimeoutError, BrokenExecutor, InvalidStateError
  • threadThreadPoolExecutor
  • processProcessPoolExecutor

Map

LinesSymbolSource module
1–5module docstring
6–18from ._base import ...Lib/concurrent/futures/_base.py
19–22from .thread import ThreadPoolExecutorLib/concurrent/futures/thread.py
23–26from .process import ProcessPoolExecutorLib/concurrent/futures/process.py
27–30__all__lists all public names

The substantive implementation lives entirely in _base.py, thread.py, and process.py. This file is a thin aggregator.

Reading

Future state machine

Future objects move through four states: PENDING, RUNNING, CANCELLED, and FINISHED. The transition table enforces which moves are legal (for example, CANCELLED can only be reached from PENDING or RUNNING, and FINISHED is terminal). All state mutations happen under self._condition, a threading.Condition.

# CPython: Lib/concurrent/futures/_base.py:287 Future.cancel
def cancel(self):
with self._condition:
if self._state not in [PENDING, RUNNING]:
return False
if self._state == RUNNING:
self._state = CANCELLED_AND_NOTIFIED
else:
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True

# CPython: Lib/concurrent/futures/_base.py:321 Future.set_result
def set_result(self, result):
with self._condition:
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
raise CancelledError(...)
if self._state == FINISHED:
raise InvalidStateError(...)
self._result = result
self._state = FINISHED
self._condition.notify_all()
self._invoke_callbacks()

Executor ABC: submit, map, shutdown

Executor is an abstract base class with three methods. submit is abstract. map is a concrete wrapper that calls submit for each argument and yields results in submission order, using as_completed internally for early exception propagation. shutdown is a no-op by default; subclasses override it to drain work queues and join worker threads or processes.

# CPython: Lib/concurrent/futures/_base.py:595 Executor.map
def map(self, fn, *iterables, timeout=None, chunksize=1):
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.monotonic())
finally:
for future in fs:
future.cancel()

as_completed and wait

as_completed attaches an internal _AcquireFutures callback to each future and yields each one as it finishes. wait collects futures into done and not_done sets using DoneAndNotDoneFutures and blocks until the requested condition (FIRST_COMPLETED, FIRST_EXCEPTION, or ALL_COMPLETED) is satisfied.

# CPython: Lib/concurrent/futures/_base.py:218 as_completed
def as_completed(fs, timeout=None):
...
with _AcquireFutures(fs):
done = set(f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
not_done = set(fs) - done
...
for future in done:
yield future
...

# CPython: Lib/concurrent/futures/_base.py:170 wait
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
...
with _AcquireFutures(fs):
done, not_done = _partition_futures(fs)
waiter = _create_and_install_waiters(fs, return_when)
...
waiter.event.wait(timeout)
...
return DoneAndNotDoneFutures(done | waiter.finished_futures, not_done)

gopy notes

Status: not yet ported.

Planned package path: module/concurrent_futures/ (the dot in the CPython package name is replaced with an underscore to keep it a valid Go identifier).

The port can be split into three milestones. First, _base (Future state machine, Executor ABC, wait/as_completed). Second, thread (ThreadPoolExecutor backed by a goroutine pool). Third, process (ProcessPoolExecutor, which requires the multiprocessing subsystem and can be deferred).