Skip to main content

multiprocessing.pool

cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/pool.py

A managed pool of worker processes. Pool(processes=N) forks N workers (default: os.cpu_count()), each running a task loop that reads from a shared SimpleQueue. The public API mirrors the built-in map family: map, starmap, imap, imap_unordered, apply, and apply_async. The async variants return AsyncResult objects whose .get() blocks until the result is ready. Two background threads inside the parent process handle result collection and crashed-worker repopulation.

Reading

Pool initialization and the worker loop

Pool.__init__ creates two SimpleQueue objects: _inqueue carries (job_id, index, func, args, kwargs) tuples to workers, and _outqueue carries (job_id, index, result) tuples back. It then spawns N processes, each running the module-level worker() function.

# Lib/multiprocessing/pool.py (CPython 3.14, simplified)
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
if initializer is not None:
initializer(*initargs)
completed = 0
while maxtasks is None or completed < maxtasks:
try:
task = inqueue.get()
except (EOFError, OSError):
break
if task is None: # sentinel: clean shutdown
break
job, idx, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception as e:
result = (False, e)
outqueue.put((job, idx, result))
completed += 1

The maxtasks parameter limits how many tasks a single worker process handles before it exits and is replaced, which bounds memory growth from tasks that leak.

apply_async and AsyncResult

apply_async(func, args, kwds, callback, error_callback) assigns the next job ID, wraps the call as a single-element task, puts it on _inqueue, and returns an AsyncResult. The result object owns a threading.Event and a slot for the value or exception.

class AsyncResult:
def __init__(self, cache, callback, error_callback):
self._event = threading.Event()
self._success = None
self._value = None
self._cache = cache # shared dict: job_id -> AsyncResult
self._callback = callback
self._error_callback = error_callback

def get(self, timeout=None):
self._event.wait(timeout)
if not self._event.is_set():
raise TimeoutError
if self._success:
return self._value
raise self._value # re-raise the worker exception

def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
if self._error_callback and not self._success:
self._error_callback(self._value)
self._event.set()
del self._cache[self._job]

_handle_results runs in a background thread, reads from _outqueue, looks up the right AsyncResult by job ID in the _cache dict, and calls _set.

_repopulate_pool: restarting crashed workers

A second background thread, _handle_workers, calls _repopulate_pool whenever it detects that a worker process has exited. It checks each Process object's exitcode; any that have terminated are removed from _pool, and enough new workers are forked to bring the count back to _processes.

def _repopulate_pool(self):
for _ in range(self._processes - len(self._pool)):
w = self.Process(
target=worker,
args=(self._inqueue, self._outqueue,
self._initializer, self._initargs,
self._maxtasksperchild),
)
self._pool.append(w)
w.daemon = True
w.start()

If a task kills its worker process (segfault, os.kill, OOM), the pool silently replaces it. The AsyncResult for the in-flight task will never be resolved unless the caller sets a timeout; CPython documents this edge case but does not automatically raise an error.

gopy mirror

Not yet ported. A Go port would need goroutine-per-worker semantics or actual os.Fork/exec subprocesses, a channel pair replacing SimpleQueue, and background goroutines mirroring _handle_results and _handle_workers. The imap and imap_unordered iterators also require ordered vs. unordered result reassembly logic that maps cleanly onto buffered channels.

CPython 3.14 changes

  • Pool now accepts a context keyword that replaces the global mp.get_context() call, making it safe to construct pools with different start methods in the same program.
  • imap chunk delivery was refactored to avoid a lock contention hot-spot visible under high core counts (bpo-94440).
  • _repopulate_pool gained a guard against re-entrancy when terminate() races with _handle_workers.