Skip to main content

Lib/concurrent/ (part 3)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/process.py

This annotation covers ProcessPoolExecutor and the Future completion API. See modules_concurrent2_detail for ThreadPoolExecutor, Executor.map, and Future basics.

Map

LinesSymbolRole
1-80ProcessPoolExecutor.__init__Fork worker processes; set up queues
81-200_process_workerWorker loop: fetch tasks, call, return results
201-300Future.resultBlock until the future completes; return value or raise
301-420waitBlock until given futures are done or timeout
421-600as_completedIterator yielding futures as they complete

Reading

ProcessPoolExecutor.__init__

# CPython: Lib/concurrent/futures/process.py:680 ProcessPoolExecutor.__init__
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), max_tasks_per_child=None):
if max_workers is None:
max_workers = os.cpu_count() or 1
if sys.platform == 'win32':
max_workers = min(61, max_workers)
self._mp_context = mp_context or multiprocessing.get_context()
self._call_queue = self._mp_context.SimpleQueue()
self._result_queue = self._mp_context.SimpleQueue()
self._work_ids = queue.SimpleQueue()
self._processes = set()
...

ProcessPoolExecutor uses multiprocessing.SimpleQueue for task and result delivery. Workers are spawned lazily on first submit. max_tasks_per_child (Python 3.11+) limits tasks per worker before restart, preventing memory leak from long-running tasks.

_process_worker

# CPython: Lib/concurrent/futures/process.py:220 _process_worker
def _process_worker(call_queue, result_queue, initializer, initargs,
max_tasks=None):
if initializer is not None:
initializer(*initargs)
count = 0
while True:
call_item = call_queue.get(block=True)
if call_item is None:
result_queue.put(os.getpid()) # Signal graceful exit
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as exc:
result_queue.put(_ExceptionResult(call_item.work_id, exc))
else:
result_queue.put(_ResultItem(call_item.work_id, r))
del call_item
count += 1
if max_tasks is not None and count >= max_tasks:
return

The worker loop is simple. del call_item is important: it releases any reference to the task's arguments before the next call_queue.get, preventing memory accumulation.

Future.result

# CPython: Lib/concurrent/futures/_base.py:458 Future.result
def result(self, timeout=None):
"""Return the result of the call. Block if necessary."""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()

future.result(timeout=5) blocks for up to 5 seconds. __get_result either returns self._result or re-raises self._exception. The _condition (threading.Condition) is notified when the future is set.

as_completed

# CPython: Lib/concurrent/futures/_base.py:218 as_completed
def as_completed(fs, timeout=None):
"""Yield futures as they complete."""
futures = set(fs)
waiter = _create_and_install_waiters(futures, _AS_COMPLETED)
try:
yield from _yield_finished_futures(futures, waiter, ref_collect=(futures,))
for future in _yield_from_waiter_queue(waiter.finished_queue, timeout):
yield future
finally:
for future in futures:
future._waiters.discard(waiter)

as_completed installs a _AsCompletedWaiter on each future. When a future completes, it calls waiter.add_result/add_exception which puts it in finished_queue. The yield from loop drains the queue as items arrive.

gopy notes

ProcessPoolExecutor is module/concurrent_futures.ProcessPoolExecutor in module/concurrent/futures/module.go. Worker processes are Go goroutines using os/exec for the actual process isolation. Future.result uses a Go channel. as_completed uses select on a channel of completed futures.