Skip to main content

Lib/concurrent/ (part 2)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/thread.py

This annotation covers the concurrent.futures execution model. See lib_concurrent_detail for Executor.__init__, Future.__init__, Future.add_done_callback, and executor shutdown.

Map

LinesSymbolRole
1-100ThreadPoolExecutor.submitSchedule a callable; return a Future
101-220ProcessPoolExecutorSame API but uses processes; pickle-based IPC
221-360Future.resultBlock until done; re-raise exception if any
361-480as_completedYield futures as they complete
481-600waitBlock until any/all futures finish

Reading

ThreadPoolExecutor.submit

# CPython: Lib/concurrent/futures/thread.py:220 ThreadPoolExecutor.submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f

executor.submit(fn, arg) wraps fn(arg) in a _WorkItem and puts it on a queue.SimpleQueue. Worker threads pull from this queue. _adjust_thread_count starts new threads if len(threads) < max_workers.

Future.result

# CPython: Lib/concurrent/futures/_base.py:440 Future.result
def result(self, timeout=None):
"""Return the result; block if not yet done."""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
raise TimeoutError()

def __get_result(self):
if self._exception:
try:
raise self._exception
finally:
self = None # Break reference cycle
return self._result

future.result() blocks on a threading.Condition until the future is done. If the callable raised, the exception is re-raised here. The self = None breaks the reference cycle between the exception traceback and the future.

as_completed

# CPython: Lib/concurrent/futures/_base.py:222 as_completed
def as_completed(fs, timeout=None):
"""Yield futures as they complete."""
future_set = set(fs)
waiter = _create_and_install_waiters(future_set, _AS_COMPLETED)
try:
for _ in range(len(future_set)):
yield waiter.finished_futures.get(timeout=...)
finally:
for f in future_set:
f.remove_done_callback(waiter.add_result)

for f in as_completed(futures): yields each future as it finishes, regardless of submission order. The waiter installs a callback on each future that puts it in a queue.SimpleQueue when done.

gopy notes

ThreadPoolExecutor.submit is module/concurrent/futures.ThreadPoolExecutor.Submit in module/concurrent/futures/module.go. Work items are goroutines. Future.result uses a sync.Cond. as_completed uses a Go channel instead of SimpleQueue. ProcessPoolExecutor spawns real processes via os/exec and uses gob encoding for IPC.