Lib/concurrent/futures/process.py
cpython 3.14 @ ab2d84fe1023/Lib/concurrent/futures/process.py
The file implements ProcessPoolExecutor. Submitted callables are pickled into
_CallItem objects, shipped over a multiprocessing.Queue (wrapped as
_SafeQueue) to worker processes, and results are returned through a shared
result_queue. A dedicated _ExecutorManagerThread in the parent process reads
results, resolves futures, detects broken workers, and manages the shutdown
sequence. A _ThreadWakeup pipe lets the main thread interrupt the manager
thread without going through the result queue.
Map
| Lines | Symbol | Role |
|---|---|---|
| 59-86 | _ThreadWakeup | Pipe-based one-way wakeup channel from main thread to manager thread |
| 157-162 | _WorkItem | Holds (future, fn, args, kwargs) before pickling; never crosses process boundary |
| 164-169 | _ResultItem | Carries (work_id, exception, result, exit_pid) back from workers |
| 171-197 | _CallItem | Holds (work_id, fn, args, kwargs); pickled and sent to workers |
| 179-197 | _SafeQueue | multiprocessing.Queue subclass that routes feeder errors to the future |
| 224-274 | _process_worker | Top-level function; subprocess target; runs the initializer then the task loop |
| 250-274 | _process_worker task loop | Reads _CallItem from call queue, executes, sends _ResultItem to result queue |
| 609-619 | BrokenProcessPool | Raised on all pending futures when a worker exits unexpectedly |
| 620-725 | ProcessPoolExecutor.__init__ | Allocates queues, wakeup pipe, process map, manager thread reference |
| 784-806 | submit | Enqueues _WorkItem, wakes manager thread, optionally spawns a new process |
| 890-909 | shutdown | Sets shutdown flag, wakes manager thread, joins it, releases resources |
Reading
_ThreadWakeup: pipe-based interrupt
_ThreadWakeup wraps a multiprocessing.Pipe (duplex=False) so the main
thread can unblock the manager thread's select loop without racing on the
result queue. wakeup() writes an empty byte string; clear() drains all
pending bytes; close() is called once from the manager thread during final
shutdown.
# CPython: Lib/concurrent/futures/process.py:59 _ThreadWakeup
class _ThreadWakeup:
def __init__(self):
self._closed = False
self._lock = threading.Lock()
self._reader, self._writer = mp.Pipe(duplex=False)
def wakeup(self):
with self._lock:
if not self._closed:
self._writer.send_bytes(b"")
def clear(self):
if self._closed:
raise RuntimeError('operation on closed _ThreadWakeup')
while self._reader.poll():
self._reader.recv_bytes()
Exchange objects: _WorkItem, _CallItem, _ResultItem
_WorkItem lives only in the parent process and is never pickled. Before
sending work to a subprocess, the manager converts it to a _CallItem (which
must be picklable). Results come back as _ResultItem, carrying the work_id
needed to look up the original _WorkItem and settle its future.
# CPython: Lib/concurrent/futures/process.py:157 _WorkItem
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
# CPython: Lib/concurrent/futures/process.py:164 _ResultItem
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
self.exit_pid = exit_pid
# CPython: Lib/concurrent/futures/process.py:171 _CallItem
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
self.fn = fn
self.args = args
self.kwargs = kwargs
_process_worker: subprocess target
_process_worker is the target= argument passed to every
multiprocessing.Process. It runs the optional initializer first. If
initialization raises, it logs and exits so the parent detects the broken pool.
The task loop blocks on call_queue.get(). A None item is the shutdown
sentinel: the worker sends back its PID so the manager knows the exit was
orderly, then returns. The max_tasks parameter supports the
max_tasks_per_child feature (3.11+).
# CPython: Lib/concurrent/futures/process.py:224 _process_worker
def _process_worker(call_queue, result_queue, initializer, initargs,
max_tasks=None):
if initializer is not None:
try:
initializer(*initargs)
except BaseException:
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
return # parent will detect broken pool via exitcode
num_tasks = 0
exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
result_queue.put(os.getpid()) # orderly exit signal
return
if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid()
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
_sendback_result(result_queue, call_item.work_id,
exception=exc, exit_pid=exit_pid)
else:
_sendback_result(result_queue, call_item.work_id,
result=r, exit_pid=exit_pid)
del r
del call_item
if exit_pid is not None:
return
ProcessPoolExecutor.init and submit
The constructor allocates the _SafeQueue call queue, the SimpleQueue
result queue, the _ThreadWakeup pipe, and the pending work items dict. The
_safe_to_dynamically_spawn_children flag is False for fork-start-method
pools (a safety guard for multi-threaded parents). submit() adds the work
item to _pending_work_items, enqueues its ID onto _work_ids, wakes the
manager thread, and optionally spawns a new process.
# CPython: Lib/concurrent/futures/process.py:784 submit
def submit(self, fn, /, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
if _global_shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
self._executor_manager_thread_wakeup.wakeup()
if self._safe_to_dynamically_spawn_children:
self._adjust_process_count()
self._start_executor_manager_thread()
return f
gopy notes
- The pickling contract for
_CallItemmeans the callable and all arguments must be picklable. gopy does not use subprocesses for this executor but if it did, theobjects/picklelayer would need to support cross-process object transfer. _ThreadWakeupmaps naturally to a Gochan struct{}of capacity 1: a non-blocking send wakes the manager goroutine without blocking the caller.- The
_ExecutorManagerThreadbecomes a goroutine in gopy. It selects over the result channel and the wakeup channel. _SafeQueue._on_queue_feeder_errorroutes pickle errors back to the future of the failed work item. gopy avoids this complexity by using shared memory or gob encoding instead of pickle._safe_to_dynamically_spawn_childrenguards against spawning new processes from a forked child. gopy usesos/exec.Cmdfor process launch, so the fork-safety concern does not apply directly.BrokenProcessPoolshould be raised on all pending futures when a worker goroutine exits with an error. gopy tracks this with a single atomic error field on the executor, checked on every result delivery.