Skip to main content

concurrent.futures.process — ProcessPoolExecutor

process.py implements ProcessPoolExecutor, the process-based backend for the concurrent.futures API. Child processes are spawned once and reused; work items are ferried over multiprocessing.Queue.

Map

LinesSymbolRole
1–60module imports / globalsmultiprocessing, queue, threading imports; sentinel constants
61–90_CallItemNamedtuple carrying work_id, callable, args, kwargs sent to workers
91–110_ResultItemNamedtuple carrying work_id, result or exception sent back to executor
111–160_process_workerTop-level function executed inside each child process
161–230_result_handlerThread function that drains the result queue and resolves futures
231–310_queue_management_workerThread that balances work queue, result queue, and process lifecycle
311–420ProcessPoolExecutor.__init__Sets up queues, process table, and thread handles
421–500ProcessPoolExecutor.submitPickles work, enqueues _CallItem, returns Future
501–560ProcessPoolExecutor._start_queue_management_threadLazily spawns the management thread and worker processes
561–620ProcessPoolExecutor.shutdownDrains pending work and joins worker processes
621–700_python_exit / atexitEnsures executors are shut down on interpreter exit

Reading

Work items and the child entry point

submit() wraps the user callable into a _CallItem and puts it on _call_queue. Inside each child process _process_worker loops forever pulling items off that queue.

# CPython: Lib/concurrent/futures/process.py:111 _process_worker
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
if initializer is not None:
initializer(*initargs)
num_tasks = 0
while True:
call_item = call_queue.get(block=True)
if call_item is None: # shutdown sentinel
result_queue.put(os.getpid())
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id, exception=e))
else:
result_queue.put(_ResultItem(call_item.work_id, result=r))
num_tasks += 1
if max_tasks is not None and num_tasks >= max_tasks:
result_queue.put(os.getpid())
return

The child never imports user code at startup; pickling happens in the parent via submit(), so only picklable callables work.

The result handler thread

A dedicated thread in the parent process reads _ResultItem objects and fulfils the matching Future.

# CPython: Lib/concurrent/futures/process.py:161 _result_handler
def _result_handler(result_queue, is_broken, futures, work_ids_queue, thread_wakeup):
while True:
try:
result_item = result_queue.get(timeout=_GLOBAL_SHUTDOWN_LOCK_TIMEOUT)
except queue.Empty:
if is_broken():
return
continue
if isinstance(result_item, int): # PID — worker exited
...
continue
future = futures.pop(result_item.work_id, None)
if future is not None:
if result_item.exception:
future.set_exception(result_item.exception)
else:
future.set_result(result_item.result)

Submitting work

submit() is the only public entry point for adding work; it must be called from the parent process.

# CPython: Lib/concurrent/futures/process.py:421 ProcessPoolExecutor.submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
...
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
self._queue_management_thread_wakeup.wakeup()
self._start_queue_management_thread()
return f

Safe shutdown

shutdown() places None sentinels on the call queue — one per live worker — then joins each process.

# CPython: Lib/concurrent/futures/process.py:561 ProcessPoolExecutor.shutdown
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._broken = True
self._shutdown = True
if cancel_futures:
...
self._queue_management_thread_wakeup.wakeup()
if wait and self._queue_management_thread:
self._queue_management_thread.join()

gopy notes

  • _process_worker is a plain function, not a method, because it must be picklable for spawn start method. gopy does not yet model multiprocessing so this file is not ported.
  • The wakeup pipe (_ThreadWakeup) is a self-pipe trick: writing one byte unblocks a select inside the management thread. gopy would use a Go channel for the same purpose.
  • max_tasks_per_child (added in 3.12) limits worker reuse to bound memory growth. It maps naturally to a goroutine restart policy.

CPython 3.14 changes

  • max_tasks_per_child parameter stabilised; the sentinel path via result_queue.put(pid) is the same since 3.12.
  • Exception pickling across process boundaries now uses __reduce_ex__ with protocol 5 by default on 3.14, improving throughput for large exception messages.
  • ProcessPoolExecutor gained a mp_context parameter in 3.7; no further changes in 3.14.