Skip to main content

Lib/concurrent/futures (part 3)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/_base.py

This annotation covers the Future lifecycle and executor submit. See lib_concurrent2_detail for ThreadPoolExecutor.__init__, the work queue, and thread management.

Map

LinesSymbolRole
1-80Executor.submitWrap a callable in a Future and enqueue
81-160Future.resultBlock until done, return result or raise exception
161-240Future.add_done_callbackRegister a callback for completion
241-360as_completedYield futures as they finish
361-500waitBlock until a set of futures reaches a condition

Reading

Executor.submit

# CPython: Lib/concurrent/futures/thread.py:180 ThreadPoolExecutor.submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f

submit creates a Future immediately and enqueues the work. The caller gets the Future back before the work starts. _adjust_thread_count starts a new thread if the pool is below max_workers and all existing threads are busy.

Future.result

# CPython: Lib/concurrent/futures/_base.py:440 Future.result
def result(self, timeout=None):
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()

def __get_result(self):
if self._exception:
raise self._exception
return self._result

future.result() blocks on a threading.Condition until the future is done. If the callable raised, result() re-raises the same exception. TimeoutError is raised if timeout expires while still waiting.

Future.add_done_callback

# CPython: Lib/concurrent/futures/_base.py:480 add_done_callback
def add_done_callback(self, fn):
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self) # already done: call immediately

Callbacks registered before completion are called when the future finishes. Callbacks registered after completion are called synchronously in add_done_callback. All callbacks run in the thread that completed the future.

as_completed

# CPython: Lib/concurrent/futures/_base.py:220 as_completed
def as_completed(fs, timeout=None):
future_set = set(fs)
waiter = _create_and_install_waiters(future_set, _AS_COMPLETED)
try:
for future in done:
yield future
end_time = timeout and (time.monotonic() + timeout)
while pending:
wait_timeout = end_time and (end_time - time.monotonic())
if not waiter.event.wait(wait_timeout):
raise TimeoutError(...)
for future in waiter.finished_futures:
yield future
waiter.finished_futures = []
waiter.event.clear()
finally:
for f in future_set:
f._waiters.remove(waiter)

as_completed installs a waiter object into each future's _waiters list. When a future completes, it notifies the event. The generator yields completed futures in completion order, not submission order.

wait

# CPython: Lib/concurrent/futures/_base.py:180 wait
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done = set()
not_done = set()
waiter = _create_and_install_waiters(fs, return_when)
...
waiter.event.wait(timeout)
...
return DoneAndNotDoneFutures(done, not_done)

wait(futures, return_when=FIRST_COMPLETED) returns as soon as any future finishes. ALL_COMPLETED (default) waits for all. FIRST_EXCEPTION returns when any future raises.

gopy notes

ThreadPoolExecutor.submit is module/concurrent.Submit in module/concurrent/module.go. It uses sync.WaitGroup and a buffered channel as the work queue. Future.result calls future.Wait() which blocks on a sync.WaitGroup. as_completed uses a chan *Future.