concurrent.futures._base — Future and Executor
_base.py is the shared foundation for both thread and process executors. It defines the Future state machine, the blocking primitives wait() and as_completed(), and the Executor abstract base class.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1–50 | module imports / constants | PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED strings |
| 51–100 | Error / CancelledError / TimeoutError / BrokenExecutor | Public exception hierarchy |
| 101–150 | _Waiter / _AsCompletedWaiter / _FirstCompletedWaiter / _AllCompletedWaiter | Internal event objects used by wait() |
| 151–200 | _create_and_install_waiters | Attaches the right _Waiter subclass to a set of futures |
| 201–380 | Future | Core class: state machine, result(), cancel(), callbacks |
| 381–440 | as_completed | Generator that yields futures as they finish |
| 441–490 | wait | Blocks until a condition over a set of futures is met |
| 491–600 | Executor | Abstract base: submit(), map(), shutdown(), context manager |
Reading
Future state machine
A Future moves through states in one direction only. result() blocks on a threading.Condition until the state reaches FINISHED or CANCELLED.
# CPython: Lib/concurrent/futures/_base.py:201 Future
class Future:
def __init__(self):
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._done_callbacks = []
def result(self, timeout=None):
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
raise TimeoutError()
Done callbacks
Callbacks registered before completion are held in a list and fired synchronously by whichever thread calls set_result() or set_exception().
# CPython: Lib/concurrent/futures/_base.py:310 Future.add_done_callback
def add_done_callback(self, fn):
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
try:
fn(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
# CPython: Lib/concurrent/futures/_base.py:330 Future._invoke_callbacks
def _invoke_callbacks(self):
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
If the future is already done when add_done_callback is called, the callback fires immediately in the caller's thread.
as_completed and wait
as_completed() installs an _AsCompletedWaiter on every future, then yields from a collections.deque that the waiter populates as futures finish.
# CPython: Lib/concurrent/futures/_base.py:381 as_completed
def as_completed(fs, timeout=None):
...
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
...
try:
for future in done:
yield future
for future in waiter.finished_futures:
yield future
...
finally:
...
for f in fs:
f._waiters.remove(waiter)
wait() uses the same waiter mechanism but blocks until a return_when condition (ALL_COMPLETED, FIRST_COMPLETED, or FIRST_EXCEPTION) is satisfied.
Executor.map
map() submits all callables eagerly, then iterates results in submission order, unwrapping exceptions as they occur.
# CPython: Lib/concurrent/futures/_base.py:530 Executor.map
def map(self, fn, *iterables, timeout=None, chunksize=1):
...
fs = [self.submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
fs.reverse()
while fs:
yield fs.pop().result(end_time and end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()
gopy notes
Future._conditionmaps to a Gosync.Cond;result(timeout)maps tocond.WaitWithDeadline.- The five string states could be an
iotaenum in Go; the one-direction constraint is enforced by the state transition checks inset_result/cancel. _invoke_callbacksfires in the thread that resolves the future. In a Go port this should be the goroutine that callsfuture.resolve(), matching the CPython behaviour.Executor.mapreverses the slice and pops from the end to get O(1) ordered iteration without copying; a Go port can use a channel or index cursor instead.
CPython 3.14 changes
BrokenExecutormoved here from the thread/process submodules in 3.9; no structural change in 3.14.Future.cancel()gained amsgparameter in 3.14 (bpo-39032 follow-up) allowing callers to attach a cancellation reason retrievable viaexception().as_completed()now raisesTimeoutError(builtin) rather thanconcurrent.futures.TimeoutErroron timeout when the two became aliases in 3.11; behaviour unchanged in 3.14.