Lib/concurrent/ (part 3)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/process.py
This annotation covers ProcessPoolExecutor and the Future completion API. See modules_concurrent2_detail for ThreadPoolExecutor, Executor.map, and Future basics.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | ProcessPoolExecutor.__init__ | Fork worker processes; set up queues |
| 81-200 | _process_worker | Worker loop: fetch tasks, call, return results |
| 201-300 | Future.result | Block until the future completes; return value or raise |
| 301-420 | wait | Block until given futures are done or timeout |
| 421-600 | as_completed | Iterator yielding futures as they complete |
Reading
ProcessPoolExecutor.__init__
# CPython: Lib/concurrent/futures/process.py:680 ProcessPoolExecutor.__init__
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), max_tasks_per_child=None):
if max_workers is None:
max_workers = os.cpu_count() or 1
if sys.platform == 'win32':
max_workers = min(61, max_workers)
self._mp_context = mp_context or multiprocessing.get_context()
self._call_queue = self._mp_context.SimpleQueue()
self._result_queue = self._mp_context.SimpleQueue()
self._work_ids = queue.SimpleQueue()
self._processes = set()
...
ProcessPoolExecutor uses multiprocessing.SimpleQueue for task and result delivery. Workers are spawned lazily on first submit. max_tasks_per_child (Python 3.11+) limits tasks per worker before restart, preventing memory leak from long-running tasks.
_process_worker
# CPython: Lib/concurrent/futures/process.py:220 _process_worker
def _process_worker(call_queue, result_queue, initializer, initargs,
max_tasks=None):
if initializer is not None:
initializer(*initargs)
count = 0
while True:
call_item = call_queue.get(block=True)
if call_item is None:
result_queue.put(os.getpid()) # Signal graceful exit
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as exc:
result_queue.put(_ExceptionResult(call_item.work_id, exc))
else:
result_queue.put(_ResultItem(call_item.work_id, r))
del call_item
count += 1
if max_tasks is not None and count >= max_tasks:
return
The worker loop is simple. del call_item is important: it releases any reference to the task's arguments before the next call_queue.get, preventing memory accumulation.
Future.result
# CPython: Lib/concurrent/futures/_base.py:458 Future.result
def result(self, timeout=None):
"""Return the result of the call. Block if necessary."""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
future.result(timeout=5) blocks for up to 5 seconds. __get_result either returns self._result or re-raises self._exception. The _condition (threading.Condition) is notified when the future is set.
as_completed
# CPython: Lib/concurrent/futures/_base.py:218 as_completed
def as_completed(fs, timeout=None):
"""Yield futures as they complete."""
futures = set(fs)
waiter = _create_and_install_waiters(futures, _AS_COMPLETED)
try:
yield from _yield_finished_futures(futures, waiter, ref_collect=(futures,))
for future in _yield_from_waiter_queue(waiter.finished_queue, timeout):
yield future
finally:
for future in futures:
future._waiters.discard(waiter)
as_completed installs a _AsCompletedWaiter on each future. When a future completes, it calls waiter.add_result/add_exception which puts it in finished_queue. The yield from loop drains the queue as items arrive.
gopy notes
ProcessPoolExecutor is module/concurrent_futures.ProcessPoolExecutor in module/concurrent/futures/module.go. Worker processes are Go goroutines using os/exec for the actual process isolation. Future.result uses a Go channel. as_completed uses select on a channel of completed futures.