concurrent.futures.process — ProcessPoolExecutor
process.py implements ProcessPoolExecutor, the process-based backend for the concurrent.futures API. Child processes are spawned once and reused; work items are ferried over multiprocessing.Queue.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1–60 | module imports / globals | multiprocessing, queue, threading imports; sentinel constants |
| 61–90 | _CallItem | Namedtuple carrying work_id, callable, args, kwargs sent to workers |
| 91–110 | _ResultItem | Namedtuple carrying work_id, result or exception sent back to executor |
| 111–160 | _process_worker | Top-level function executed inside each child process |
| 161–230 | _result_handler | Thread function that drains the result queue and resolves futures |
| 231–310 | _queue_management_worker | Thread that balances work queue, result queue, and process lifecycle |
| 311–420 | ProcessPoolExecutor.__init__ | Sets up queues, process table, and thread handles |
| 421–500 | ProcessPoolExecutor.submit | Pickles work, enqueues _CallItem, returns Future |
| 501–560 | ProcessPoolExecutor._start_queue_management_thread | Lazily spawns the management thread and worker processes |
| 561–620 | ProcessPoolExecutor.shutdown | Drains pending work and joins worker processes |
| 621–700 | _python_exit / atexit | Ensures executors are shut down on interpreter exit |
Reading
Work items and the child entry point
submit() wraps the user callable into a _CallItem and puts it on _call_queue. Inside each child process _process_worker loops forever pulling items off that queue.
# CPython: Lib/concurrent/futures/process.py:111 _process_worker
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
if initializer is not None:
initializer(*initargs)
num_tasks = 0
while True:
call_item = call_queue.get(block=True)
if call_item is None: # shutdown sentinel
result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id, exception=e))
else:
result_queue.put(_ResultItem(call_item.work_id, result=r))
num_tasks += 1
if max_tasks is not None and num_tasks >= max_tasks:
result_queue.put(os.getpid())
return
The child never imports user code at startup; pickling happens in the parent via submit(), so only picklable callables work.
The result handler thread
A dedicated thread in the parent process reads _ResultItem objects and fulfils the matching Future.
# CPython: Lib/concurrent/futures/process.py:161 _result_handler
def _result_handler(result_queue, is_broken, futures, work_ids_queue, thread_wakeup):
while True:
try:
result_item = result_queue.get(timeout=_GLOBAL_SHUTDOWN_LOCK_TIMEOUT)
except queue.Empty:
if is_broken():
return
continue
if isinstance(result_item, int): # PID — worker exited
...
continue
future = futures.pop(result_item.work_id, None)
if future is not None:
if result_item.exception:
future.set_exception(result_item.exception)
else:
future.set_result(result_item.result)
Submitting work
submit() is the only public entry point for adding work; it must be called from the parent process.
# CPython: Lib/concurrent/futures/process.py:421 ProcessPoolExecutor.submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
...
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
self._queue_management_thread_wakeup.wakeup()
self._start_queue_management_thread()
return f
Safe shutdown
shutdown() places None sentinels on the call queue — one per live worker — then joins each process.
# CPython: Lib/concurrent/futures/process.py:561 ProcessPoolExecutor.shutdown
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._broken = True
self._shutdown = True
if cancel_futures:
...
self._queue_management_thread_wakeup.wakeup()
if wait and self._queue_management_thread:
self._queue_management_thread.join()
gopy notes
_process_workeris a plain function, not a method, because it must be picklable forspawnstart method. gopy does not yet modelmultiprocessingso this file is not ported.- The wakeup pipe (
_ThreadWakeup) is a self-pipe trick: writing one byte unblocks aselectinside the management thread. gopy would use a Go channel for the same purpose. max_tasks_per_child(added in 3.12) limits worker reuse to bound memory growth. It maps naturally to a goroutine restart policy.
CPython 3.14 changes
max_tasks_per_childparameter stabilised; the sentinel path viaresult_queue.put(pid)is the same since 3.12.- Exception pickling across process boundaries now uses
__reduce_ex__with protocol 5 by default on 3.14, improving throughput for large exception messages. ProcessPoolExecutorgained amp_contextparameter in 3.7; no further changes in 3.14.