Skip to main content

concurrent.futures._base — Future and Executor

_base.py is the shared foundation for both thread and process executors. It defines the Future state machine, the blocking primitives wait() and as_completed(), and the Executor abstract base class.

Map

LinesSymbolRole
1–50module imports / constantsPENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED strings
51–100Error / CancelledError / TimeoutError / BrokenExecutorPublic exception hierarchy
101–150_Waiter / _AsCompletedWaiter / _FirstCompletedWaiter / _AllCompletedWaiterInternal event objects used by wait()
151–200_create_and_install_waitersAttaches the right _Waiter subclass to a set of futures
201–380FutureCore class: state machine, result(), cancel(), callbacks
381–440as_completedGenerator that yields futures as they finish
441–490waitBlocks until a condition over a set of futures is met
491–600ExecutorAbstract base: submit(), map(), shutdown(), context manager

Reading

Future state machine

A Future moves through states in one direction only. result() blocks on a threading.Condition until the state reaches FINISHED or CANCELLED.

# CPython: Lib/concurrent/futures/_base.py:201 Future
class Future:
def __init__(self):
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._done_callbacks = []

def result(self, timeout=None):
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
raise TimeoutError()

Done callbacks

Callbacks registered before completion are held in a list and fired synchronously by whichever thread calls set_result() or set_exception().

# CPython: Lib/concurrent/futures/_base.py:310 Future.add_done_callback
def add_done_callback(self, fn):
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
try:
fn(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)

# CPython: Lib/concurrent/futures/_base.py:330 Future._invoke_callbacks
def _invoke_callbacks(self):
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)

If the future is already done when add_done_callback is called, the callback fires immediately in the caller's thread.

as_completed and wait

as_completed() installs an _AsCompletedWaiter on every future, then yields from a collections.deque that the waiter populates as futures finish.

# CPython: Lib/concurrent/futures/_base.py:381 as_completed
def as_completed(fs, timeout=None):
...
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
...
try:
for future in done:
yield future
for future in waiter.finished_futures:
yield future
...
finally:
...
for f in fs:
f._waiters.remove(waiter)

wait() uses the same waiter mechanism but blocks until a return_when condition (ALL_COMPLETED, FIRST_COMPLETED, or FIRST_EXCEPTION) is satisfied.

Executor.map

map() submits all callables eagerly, then iterates results in submission order, unwrapping exceptions as they occur.

# CPython: Lib/concurrent/futures/_base.py:530 Executor.map
def map(self, fn, *iterables, timeout=None, chunksize=1):
...
fs = [self.submit(fn, *args) for args in zip(*iterables)]
def result_iterator():
try:
fs.reverse()
while fs:
yield fs.pop().result(end_time and end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
return result_iterator()

gopy notes

  • Future._condition maps to a Go sync.Cond; result(timeout) maps to cond.WaitWithDeadline.
  • The five string states could be an iota enum in Go; the one-direction constraint is enforced by the state transition checks in set_result / cancel.
  • _invoke_callbacks fires in the thread that resolves the future. In a Go port this should be the goroutine that calls future.resolve(), matching the CPython behaviour.
  • Executor.map reverses the slice and pops from the end to get O(1) ordered iteration without copying; a Go port can use a channel or index cursor instead.

CPython 3.14 changes

  • BrokenExecutor moved here from the thread/process submodules in 3.9; no structural change in 3.14.
  • Future.cancel() gained a msg parameter in 3.14 (bpo-39032 follow-up) allowing callers to attach a cancellation reason retrievable via exception().
  • as_completed() now raises TimeoutError (builtin) rather than concurrent.futures.TimeoutError on timeout when the two became aliases in 3.11; behaviour unchanged in 3.14.