Skip to main content

multiprocessing/pool.py

Lib/multiprocessing/pool.py implements the worker-pool abstraction. A Pool owns a fixed set of worker processes and several internal threads that route tasks and results without the caller needing to manage IPC directly.

Map

LinesSymbolRole
1–60module preambleimports, RUN/CLOSE/TERMINATE constants, worker() free function
61–90worker()entry point run in each worker process; loops over task queue
91–200Pool.__init__starts worker processes, result/task handler threads, sets up queues
201–240Pool.apply / apply_asyncsubmit a single callable; apply blocks on AsyncResult.get()
241–320Pool.map / map_asyncchunk iterable via _get_tasks(), submit as MapResult
321–380Pool.imap / imap_unorderedlazy iterator variants backed by IMapIterator
381–440Pool.starmap / starmap_asynclike map but unpacks argument tuples
441–530ApplyResult / MapResultAsyncResult subclasses; store value or exception from worker
531–620_handle_tasks threadpulls from _taskqueue, writes to the task SimpleQueue
621–700_handle_results threadreads result queue, calls _set on the matching AsyncResult
701–760_handle_workers threadreplenishes worker pool when a process exits unexpectedly
761–840Pool.terminate / _terminatesends sentinel values, joins threads, terminates processes
841–900context manager, __del____enter__/__exit__, finalizer guard

Reading

Pool.init and internal threading

The constructor wires together three threading.Thread objects and two queues before any user code runs.

# CPython: Lib/multiprocessing/pool.py:202 Pool.__init__
self._taskqueue = queue.SimpleQueue()
self._cache = {}
self._state = RUN
...
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self._cache, self._taskqueue, ...))
self._worker_handler.daemon = True
self._worker_handler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool, ...))
self._task_handler.daemon = True
self._task_handler.start()
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache))
self._result_handler.daemon = True
self._result_handler.start()

All three threads are daemon threads so they do not prevent interpreter shutdown if the pool is abandoned without being closed.

apply_async() and ApplyResult

apply_async() wraps the call in a (func, args, kwargs) tuple, registers an ApplyResult in self._cache, and enqueues the task.

# CPython: Lib/multiprocessing/pool.py:371 Pool.apply_async
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self._check_running()
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result

The integer 0 is the chunk index. result._job is a monotonically increasing counter used as the key in self._cache so _handle_results can route the reply back.

_get_tasks() chunking for map_async()

map_async() does not send the full iterable at once. _get_tasks() slices it into chunks of chunksize so that the task queue stays bounded and workers stay busy across uneven iterables.

# CPython: Lib/multiprocessing/pool.py:432 _get_tasks
def _get_tasks(func, it, size):
it = iter(it)
while 1:
x = tuple(itertools.islice(it, size))
if not x:
return
yield func, x

Each yielded (func, chunk) becomes one entry in _taskqueue. Workers unpack the chunk and call func on each element, returning a list of results that MapResult reassembles in order.

terminate() and the sentinel shutdown protocol

Pool.terminate() signals workers by pushing one None sentinel per worker onto the task queue. When a worker receives None it breaks its loop and exits cleanly.

# CPython: Lib/multiprocessing/pool.py:588 Pool._help_stuff_finish
def _help_stuff_finish(inqueue, task_handler, size):
inqueue._rlock.acquire()
try:
task_handler._state = TERMINATE
inqueue._reader.close()
for i in range(size):
inqueue._writer.send(None)
finally:
inqueue._rlock.release()

After the sentinels are sent, _terminate joins each handler thread (with a timeout) and then calls p.terminate() on each remaining worker process.

gopy notes

  • The three internal threads map naturally to goroutines communicating over channels. The _taskqueue and _outqueue become chan types; _cache becomes a sync.Map or a mutex-guarded map[int]*AsyncResult.
  • ApplyResult._job is a global counter incremented under _cache-level locking. In Go this is an atomic.Int64.
  • chunksize selection (the _get_tasks call from map_async) should be preserved verbatim: chunksize = max(1, len(iterable) // (4 * processes)). Changing it breaks throughput parity with CPython.
  • The daemon-thread model means pool cleanup is not guaranteed on abnormal exit. A Go port must register a finalizer or defer equivalent to avoid goroutine leaks.

CPython 3.14 changes

  • Pool.starmap_async gained the same error_callback parameter that map_async already had (backported from a long-standing inconsistency).
  • _handle_workers now uses os.waitid with WNOHANG on POSIX instead of looping over p.exitcode, reducing polling overhead.
  • The maxtasksperchild path was refactored to share the same sentinel-based shutdown as terminate(), removing a separate _wrap_exception code path.