Lib/concurrent/ (part 2)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/thread.py
This annotation covers the concurrent.futures execution model. See lib_concurrent_detail for Executor.__init__, Future.__init__, Future.add_done_callback, and executor shutdown.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-100 | ThreadPoolExecutor.submit | Schedule a callable; return a Future |
| 101-220 | ProcessPoolExecutor | Same API but uses processes; pickle-based IPC |
| 221-360 | Future.result | Block until done; re-raise exception if any |
| 361-480 | as_completed | Yield futures as they complete |
| 481-600 | wait | Block until any/all futures finish |
Reading
ThreadPoolExecutor.submit
# CPython: Lib/concurrent/futures/thread.py:220 ThreadPoolExecutor.submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
executor.submit(fn, arg) wraps fn(arg) in a _WorkItem and puts it on a queue.SimpleQueue. Worker threads pull from this queue. _adjust_thread_count starts new threads if len(threads) < max_workers.
Future.result
# CPython: Lib/concurrent/futures/_base.py:440 Future.result
def result(self, timeout=None):
"""Return the result; block if not yet done."""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
raise TimeoutError()
def __get_result(self):
if self._exception:
try:
raise self._exception
finally:
self = None # Break reference cycle
return self._result
future.result() blocks on a threading.Condition until the future is done. If the callable raised, the exception is re-raised here. The self = None breaks the reference cycle between the exception traceback and the future.
as_completed
# CPython: Lib/concurrent/futures/_base.py:222 as_completed
def as_completed(fs, timeout=None):
"""Yield futures as they complete."""
future_set = set(fs)
waiter = _create_and_install_waiters(future_set, _AS_COMPLETED)
try:
for _ in range(len(future_set)):
yield waiter.finished_futures.get(timeout=...)
finally:
for f in future_set:
f.remove_done_callback(waiter.add_result)
for f in as_completed(futures): yields each future as it finishes, regardless of submission order. The waiter installs a callback on each future that puts it in a queue.SimpleQueue when done.
gopy notes
ThreadPoolExecutor.submit is module/concurrent/futures.ThreadPoolExecutor.Submit in module/concurrent/futures/module.go. Work items are goroutines. Future.result uses a sync.Cond. as_completed uses a Go channel instead of SimpleQueue. ProcessPoolExecutor spawns real processes via os/exec and uses gob encoding for IPC.