Lib/concurrent/futures/__init__.py
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/__init__.py
concurrent.futures is a package under Lib/concurrent/futures/. The
public API lives across four files:
_base.py--Future,Executor,as_completed,wait, theFIRST_COMPLETED/FIRST_EXCEPTION/ALL_COMPLETEDconstants, and internal_Waiterclasses.thread.py--ThreadPoolExecutorand_WorkItem.process.py--ProcessPoolExecutor,_CallItem,_ResultItem, and the worker process loop.__init__.py-- re-exports everything from the above.
The design deliberately mirrors java.util.concurrent and
asyncio.Future in structure. concurrent.futures.Future is a distinct
class from asyncio.Future; asyncio.wrap_future bridges the two.
Map
| Lines | Symbol | Role | gopy |
|---|---|---|---|
| _base.py 1-300 | Future, _PENDING, _RUNNING, _CANCELLED, _CANCELLED_AND_NOTIFIED, _FINISHED | Five-state result container; result() blocks until state reaches _FINISHED; add_done_callback fires immediately if already finished. | (stdlib pending) |
| _base.py 300-500 | Executor, Executor.map, Executor.shutdown | Abstract base; map wraps submit in a lazy iterator that calls result() in submission order; shutdown flushes the pool. | (stdlib pending) |
| _base.py 500-700 | wait, as_completed, _Waiter, _FirstCompletedWaiter, _AllCompletedWaiter | wait blocks until one of the three completion conditions is met; as_completed yields futures as they finish using a _FirstCompletedWaiter. | (stdlib pending) |
| thread.py 1-300 | _WorkItem, _worker, BrokenThreadPool, ThreadPoolExecutor | _WorkItem holds the callable and its Future; _worker is the function run in each pool thread; ThreadPoolExecutor.submit enqueues a _WorkItem and spawns threads up to max_workers. | (stdlib pending) |
| process.py 1-400 | _CallItem, _ResultItem, _SafeQueue, _process_worker, ProcessPoolExecutor | _process_worker runs in each worker process, reading _CallItem objects from a multiprocessing.Queue and writing _ResultItem objects back. | (stdlib pending) |
Reading
Future state machine (_base.py lines 1 to 300)
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/_base.py#L1-300
class Future:
def __init__(self):
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._done_callbacks = []
def result(self, timeout=None):
try:
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
finally:
# Break a reference cycle with the exception in self._exception
self = None
def set_result(self, result):
with self._condition:
if self._state != RUNNING:
raise InvalidStateError(...)
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
concurrent.futures.Future is thread-safe: all state transitions happen
under self._condition, a threading.Condition. result() calls
self._condition.wait(timeout) to block the calling thread. When
set_result fires it calls notify_all(), waking all blocked result()
calls. The same _condition serializes cancel(), set_exception(), and
set_result().
The five states form a linear progression with one branch:
PENDING -> RUNNING -> FINISHED
\-> CANCELLED -> CANCELLED_AND_NOTIFIED
cancel() succeeds only from PENDING or RUNNING (before the executor
has started the work item). Once in RUNNING the executor checks
Future.cancelled() before invoking the callable and transitions to
CANCELLED_AND_NOTIFIED if a cancellation was requested.
_invoke_callbacks is called outside the condition lock to avoid
deadlocks when a callback acquires a lock of its own. Done callbacks
receive the Future object as their only argument.
ThreadPoolExecutor worker loop (thread.py lines 1 to 300)
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/thread.py#L1-300
def _worker(executor_reference, work_queue, initializer, initargs):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
executor = executor_reference()
if executor is not None:
executor._initializer_failed()
return
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
del work_item
executor = executor_reference()
if executor is not None:
executor._work_queue.task_done()
del executor
continue
executor = executor_reference()
if executor is None or executor._shutdown:
work_queue.put(None)
return
del executor
except BaseException:
_base.LOGGER.critical('Exception in worker', exc_info=True)
Each pool thread runs _worker. The sentinel None item signals
shutdown: when a thread receives None it puts None back on the queue
(so the next thread also wakes up and exits) and returns. This is a
broadcast-by-relay pattern that avoids needing one sentinel per thread.
_WorkItem.run calls the user callable and sets the result or exception
on the Future:
class _WorkItem:
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
self = None
else:
self.future.set_result(result)
self = None
set_running_or_notify_cancel atomically transitions the future to
RUNNING or, if it was already cancelled, calls _invoke_callbacks and
returns False. Setting self = None at the end of both branches breaks
the reference cycle between _WorkItem, the callable, and the Future.
as_completed iterator (_base.py lines 500 to 700)
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/_base.py#L500-700
def as_completed(fs, timeout=None):
if timeout is not None:
end_time = timeout + time.monotonic()
fs = set(fs)
done = set()
not_done = set()
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try:
yield from done
for future in done:
fs.discard(future)
while fs:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.monotonic()
if wait_timeout < 0:
raise TimeoutError(...)
with waiter.lock:
finished_futures = list(waiter.finished_futures)
waiter.finished_futures = []
waiter.event.clear()
for future in finished_futures:
yield future
fs.discard(future)
if fs:
waiter.event.wait(timeout=wait_timeout)
finally:
for f in fs:
with f._condition:
f._waiters.remove(waiter)
as_completed installs a _FirstCompletedWaiter on every future before
entering the yield loop. The waiter's add_result / add_exception /
add_cancelled callbacks append the finished future to
waiter.finished_futures and set waiter.event. The generator wakes up,
drains finished_futures, yields each one, and then calls waiter.event.wait
again until all futures have been yielded.
The finally block removes the waiter from every still-pending future to
avoid a reference leak when the caller abandons the iterator early with
break.
gopy mirror
concurrent.futures is pending. ThreadPoolExecutor maps naturally to
goroutines with a bounded worker pool and a chan work queue. Future's
threading.Condition-based blocking must be replaced with Go channel
semantics. ProcessPoolExecutor requires os/exec process management and
is lower priority than the thread-based executor.