multiprocessing.pool
cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/pool.py
A managed pool of worker processes. Pool(processes=N) forks N workers (default: os.cpu_count()), each running a task loop that reads from a shared SimpleQueue. The public API mirrors the built-in map family: map, starmap, imap, imap_unordered, apply, and apply_async. The async variants return AsyncResult objects whose .get() blocks until the result is ready. Two background threads inside the parent process handle result collection and crashed-worker repopulation.
Reading
Pool initialization and the worker loop
Pool.__init__ creates two SimpleQueue objects: _inqueue carries (job_id, index, func, args, kwargs) tuples to workers, and _outqueue carries (job_id, index, result) tuples back. It then spawns N processes, each running the module-level worker() function.
# Lib/multiprocessing/pool.py (CPython 3.14, simplified)
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
if initializer is not None:
initializer(*initargs)
completed = 0
while maxtasks is None or completed < maxtasks:
try:
task = inqueue.get()
except (EOFError, OSError):
break
if task is None: # sentinel: clean shutdown
break
job, idx, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception as e:
result = (False, e)
outqueue.put((job, idx, result))
completed += 1
The maxtasks parameter limits how many tasks a single worker process handles before it exits and is replaced, which bounds memory growth from tasks that leak.
apply_async and AsyncResult
apply_async(func, args, kwds, callback, error_callback) assigns the next job ID, wraps the call as a single-element task, puts it on _inqueue, and returns an AsyncResult. The result object owns a threading.Event and a slot for the value or exception.
class AsyncResult:
def __init__(self, cache, callback, error_callback):
self._event = threading.Event()
self._success = None
self._value = None
self._cache = cache # shared dict: job_id -> AsyncResult
self._callback = callback
self._error_callback = error_callback
def get(self, timeout=None):
self._event.wait(timeout)
if not self._event.is_set():
raise TimeoutError
if self._success:
return self._value
raise self._value # re-raise the worker exception
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
if self._error_callback and not self._success:
self._error_callback(self._value)
self._event.set()
del self._cache[self._job]
_handle_results runs in a background thread, reads from _outqueue, looks up the right AsyncResult by job ID in the _cache dict, and calls _set.
_repopulate_pool: restarting crashed workers
A second background thread, _handle_workers, calls _repopulate_pool whenever it detects that a worker process has exited. It checks each Process object's exitcode; any that have terminated are removed from _pool, and enough new workers are forked to bring the count back to _processes.
def _repopulate_pool(self):
for _ in range(self._processes - len(self._pool)):
w = self.Process(
target=worker,
args=(self._inqueue, self._outqueue,
self._initializer, self._initargs,
self._maxtasksperchild),
)
self._pool.append(w)
w.daemon = True
w.start()
If a task kills its worker process (segfault, os.kill, OOM), the pool silently replaces it. The AsyncResult for the in-flight task will never be resolved unless the caller sets a timeout; CPython documents this edge case but does not automatically raise an error.
gopy mirror
Not yet ported. A Go port would need goroutine-per-worker semantics or actual os.Fork/exec subprocesses, a channel pair replacing SimpleQueue, and background goroutines mirroring _handle_results and _handle_workers. The imap and imap_unordered iterators also require ordered vs. unordered result reassembly logic that maps cleanly onto buffered channels.
CPython 3.14 changes
Poolnow accepts acontextkeyword that replaces the globalmp.get_context()call, making it safe to construct pools with different start methods in the same program.imapchunk delivery was refactored to avoid a lock contention hot-spot visible under high core counts (bpo-94440)._repopulate_poolgained a guard against re-entrancy whenterminate()races with_handle_workers.