Lib/concurrent/futures/__init__.py
Source:
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/__init__.py
The public concurrent.futures namespace is assembled here by importing from
three private submodules:
_base—Future,Executor,wait,as_completed,FIRST_COMPLETED,FIRST_EXCEPTION,ALL_COMPLETED,CancelledError,TimeoutError,BrokenExecutor,InvalidStateErrorthread—ThreadPoolExecutorprocess—ProcessPoolExecutor
Map
| Lines | Symbol | Source module |
|---|---|---|
| 1–5 | module docstring | — |
| 6–18 | from ._base import ... | Lib/concurrent/futures/_base.py |
| 19–22 | from .thread import ThreadPoolExecutor | Lib/concurrent/futures/thread.py |
| 23–26 | from .process import ProcessPoolExecutor | Lib/concurrent/futures/process.py |
| 27–30 | __all__ | lists all public names |
The substantive implementation lives entirely in _base.py, thread.py, and
process.py. This file is a thin aggregator.
Reading
Future state machine
Future objects move through four states: PENDING, RUNNING, CANCELLED,
and FINISHED. The transition table enforces which moves are legal (for example,
CANCELLED can only be reached from PENDING or RUNNING, and FINISHED is
terminal). All state mutations happen under self._condition, a threading.Condition.
# CPython: Lib/concurrent/futures/_base.py:287 Future.cancel
def cancel(self):
with self._condition:
if self._state not in [PENDING, RUNNING]:
return False
if self._state == RUNNING:
self._state = CANCELLED_AND_NOTIFIED
else:
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True
# CPython: Lib/concurrent/futures/_base.py:321 Future.set_result
def set_result(self, result):
with self._condition:
if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
raise CancelledError(...)
if self._state == FINISHED:
raise InvalidStateError(...)
self._result = result
self._state = FINISHED
self._condition.notify_all()
self._invoke_callbacks()
Executor ABC: submit, map, shutdown
Executor is an abstract base class with three methods. submit is abstract.
map is a concrete wrapper that calls submit for each argument and yields
results in submission order, using as_completed internally for early exception
propagation. shutdown is a no-op by default; subclasses override it to drain
work queues and join worker threads or processes.
# CPython: Lib/concurrent/futures/_base.py:595 Executor.map
def map(self, fn, *iterables, timeout=None, chunksize=1):
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
as_completed and wait
as_completed attaches an internal _AcquireFutures callback to each future
and yields each one as it finishes. wait collects futures into done and
not_done sets using DoneAndNotDoneFutures and blocks until the requested
condition (FIRST_COMPLETED, FIRST_EXCEPTION, or ALL_COMPLETED) is
satisfied.
# CPython: Lib/concurrent/futures/_base.py:218 as_completed
def as_completed(fs, timeout=None):
...
with _AcquireFutures(fs):
done = set(f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
not_done = set(fs) - done
...
for future in done:
yield future
...
# CPython: Lib/concurrent/futures/_base.py:170 wait
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
...
with _AcquireFutures(fs):
done, not_done = _partition_futures(fs)
waiter = _create_and_install_waiters(fs, return_when)
...
waiter.event.wait(timeout)
...
return DoneAndNotDoneFutures(done | waiter.finished_futures, not_done)
gopy notes
Status: not yet ported.
Planned package path: module/concurrent_futures/ (the dot in the CPython
package name is replaced with an underscore to keep it a valid Go identifier).
The port can be split into three milestones. First, _base (Future state machine,
Executor ABC, wait/as_completed). Second, thread (ThreadPoolExecutor backed by
a goroutine pool). Third, process (ProcessPoolExecutor, which requires the
multiprocessing subsystem and can be deferred).