Skip to main content

Lib/concurrent/futures/process.py

cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/process.py

The file implements ProcessPoolExecutor. Submitted callables are pickled into _CallItem objects, shipped over a multiprocessing.Queue (wrapped as _SafeQueue) to worker processes, and results are returned through a shared result_queue. A dedicated _ExecutorManagerThread in the parent process reads results, resolves futures, detects broken workers, and manages the shutdown sequence. A _ThreadWakeup pipe lets the main thread interrupt the manager thread without going through the result queue.

Map

LinesSymbolRole
59-86_ThreadWakeupPipe-based one-way wakeup channel from main thread to manager thread
157-162_WorkItemHolds (future, fn, args, kwargs) before pickling; never crosses process boundary
164-169_ResultItemCarries (work_id, exception, result, exit_pid) back from workers
171-197_CallItemHolds (work_id, fn, args, kwargs); pickled and sent to workers
179-197_SafeQueuemultiprocessing.Queue subclass that routes feeder errors to the future
224-274_process_workerTop-level function; subprocess target; runs the initializer then the task loop
250-274_process_worker task loopReads _CallItem from call queue, executes, sends _ResultItem to result queue
609-619BrokenProcessPoolRaised on all pending futures when a worker exits unexpectedly
620-725ProcessPoolExecutor.__init__Allocates queues, wakeup pipe, process map, manager thread reference
784-806submitEnqueues _WorkItem, wakes manager thread, optionally spawns a new process
890-909shutdownSets shutdown flag, wakes manager thread, joins it, releases resources

Reading

_ThreadWakeup: pipe-based interrupt

_ThreadWakeup wraps a multiprocessing.Pipe (duplex=False) so the main thread can unblock the manager thread's select loop without racing on the result queue. wakeup() writes an empty byte string; clear() drains all pending bytes; close() is called once from the manager thread during final shutdown.

# CPython: Lib/concurrent/futures/process.py:59 _ThreadWakeup
class _ThreadWakeup:
def __init__(self):
self._closed = False
self._lock = threading.Lock()
self._reader, self._writer = mp.Pipe(duplex=False)

def wakeup(self):
with self._lock:
if not self._closed:
self._writer.send_bytes(b"")

def clear(self):
if self._closed:
raise RuntimeError('operation on closed _ThreadWakeup')
while self._reader.poll():
self._reader.recv_bytes()

Exchange objects: _WorkItem, _CallItem, _ResultItem

_WorkItem lives only in the parent process and is never pickled. Before sending work to a subprocess, the manager converts it to a _CallItem (which must be picklable). Results come back as _ResultItem, carrying the work_id needed to look up the original _WorkItem and settle its future.

# CPython: Lib/concurrent/futures/process.py:157 _WorkItem
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs

# CPython: Lib/concurrent/futures/process.py:164 _ResultItem
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
self.exit_pid = exit_pid

# CPython: Lib/concurrent/futures/process.py:171 _CallItem
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
self.fn = fn
self.args = args
self.kwargs = kwargs

_process_worker: subprocess target

_process_worker is the target= argument passed to every multiprocessing.Process. It runs the optional initializer first. If initialization raises, it logs and exits so the parent detects the broken pool. The task loop blocks on call_queue.get(). A None item is the shutdown sentinel: the worker sends back its PID so the manager knows the exit was orderly, then returns. The max_tasks parameter supports the max_tasks_per_child feature (3.11+).

# CPython: Lib/concurrent/futures/process.py:224 _process_worker
def _process_worker(call_queue, result_queue, initializer, initargs,
max_tasks=None):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
return # parent will detect broken pool via exitcode

num_tasks = 0
exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
result_queue.put(os.getpid()) # orderly exit signal
return
if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid()
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id,
exception=exc, exit_pid=exit_pid)
else:
_sendback_result(result_queue, call_item.work_id,
result=r, exit_pid=exit_pid)
del r
del call_item
if exit_pid is not None:
return

ProcessPoolExecutor.init and submit

The constructor allocates the _SafeQueue call queue, the SimpleQueue result queue, the _ThreadWakeup pipe, and the pending work items dict. The _safe_to_dynamically_spawn_children flag is False for fork-start-method pools (a safety guard for multi-threaded parents). submit() adds the work item to _pending_work_items, enqueues its ID onto _work_ids, wakes the manager thread, and optionally spawns a new process.

# CPython: Lib/concurrent/futures/process.py:784 submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
if _global_shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
self._executor_manager_thread_wakeup.wakeup()
if self._safe_to_dynamically_spawn_children:
self._adjust_process_count()
self._start_executor_manager_thread()
return f

gopy notes

  • The pickling contract for _CallItem means the callable and all arguments must be picklable. gopy does not use subprocesses for this executor but if it did, the objects/pickle layer would need to support cross-process object transfer.
  • _ThreadWakeup maps naturally to a Go chan struct{} of capacity 1: a non-blocking send wakes the manager goroutine without blocking the caller.
  • The _ExecutorManagerThread becomes a goroutine in gopy. It selects over the result channel and the wakeup channel.
  • _SafeQueue._on_queue_feeder_error routes pickle errors back to the future of the failed work item. gopy avoids this complexity by using shared memory or gob encoding instead of pickle.
  • _safe_to_dynamically_spawn_children guards against spawning new processes from a forked child. gopy uses os/exec.Cmd for process launch, so the fork-safety concern does not apply directly.
  • BrokenProcessPool should be raised on all pending futures when a worker goroutine exits with an error. gopy tracks this with a single atomic error field on the executor, checked on every result delivery.