multiprocessing/pool.py
Lib/multiprocessing/pool.py implements the worker-pool abstraction. A Pool owns a fixed set of worker processes and several internal threads that route tasks and results without the caller needing to manage IPC directly.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1–60 | module preamble | imports, RUN/CLOSE/TERMINATE constants, worker() free function |
| 61–90 | worker() | entry point run in each worker process; loops over task queue |
| 91–200 | Pool.__init__ | starts worker processes, result/task handler threads, sets up queues |
| 201–240 | Pool.apply / apply_async | submit a single callable; apply blocks on AsyncResult.get() |
| 241–320 | Pool.map / map_async | chunk iterable via _get_tasks(), submit as MapResult |
| 321–380 | Pool.imap / imap_unordered | lazy iterator variants backed by IMapIterator |
| 381–440 | Pool.starmap / starmap_async | like map but unpacks argument tuples |
| 441–530 | ApplyResult / MapResult | AsyncResult subclasses; store value or exception from worker |
| 531–620 | _handle_tasks thread | pulls from _taskqueue, writes to the task SimpleQueue |
| 621–700 | _handle_results thread | reads result queue, calls _set on the matching AsyncResult |
| 701–760 | _handle_workers thread | replenishes worker pool when a process exits unexpectedly |
| 761–840 | Pool.terminate / _terminate | sends sentinel values, joins threads, terminates processes |
| 841–900 | context manager, __del__ | __enter__/__exit__, finalizer guard |
Reading
Pool.init and internal threading
The constructor wires together three threading.Thread objects and two queues before any user code runs.
# CPython: Lib/multiprocessing/pool.py:202 Pool.__init__
self._taskqueue = queue.SimpleQueue()
self._cache = {}
self._state = RUN
...
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self._cache, self._taskqueue, ...))
self._worker_handler.daemon = True
self._worker_handler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue, self._pool, ...))
self._task_handler.daemon = True
self._task_handler.start()
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache))
self._result_handler.daemon = True
self._result_handler.start()
All three threads are daemon threads so they do not prevent interpreter shutdown if the pool is abandoned without being closed.
apply_async() and ApplyResult
apply_async() wraps the call in a (func, args, kwargs) tuple, registers an ApplyResult in self._cache, and enqueues the task.
# CPython: Lib/multiprocessing/pool.py:371 Pool.apply_async
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self._check_running()
result = ApplyResult(self._cache, callback, error_callback)
self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
return result
The integer 0 is the chunk index. result._job is a monotonically increasing counter used as the key in self._cache so _handle_results can route the reply back.
_get_tasks() chunking for map_async()
map_async() does not send the full iterable at once. _get_tasks() slices it into chunks of chunksize so that the task queue stays bounded and workers stay busy across uneven iterables.
# CPython: Lib/multiprocessing/pool.py:432 _get_tasks
def _get_tasks(func, it, size):
it = iter(it)
while 1:
x = tuple(itertools.islice(it, size))
if not x:
return
yield func, x
Each yielded (func, chunk) becomes one entry in _taskqueue. Workers unpack the chunk and call func on each element, returning a list of results that MapResult reassembles in order.
terminate() and the sentinel shutdown protocol
Pool.terminate() signals workers by pushing one None sentinel per worker onto the task queue. When a worker receives None it breaks its loop and exits cleanly.
# CPython: Lib/multiprocessing/pool.py:588 Pool._help_stuff_finish
def _help_stuff_finish(inqueue, task_handler, size):
inqueue._rlock.acquire()
try:
task_handler._state = TERMINATE
inqueue._reader.close()
for i in range(size):
inqueue._writer.send(None)
finally:
inqueue._rlock.release()
After the sentinels are sent, _terminate joins each handler thread (with a timeout) and then calls p.terminate() on each remaining worker process.
gopy notes
- The three internal threads map naturally to goroutines communicating over channels. The
_taskqueueand_outqueuebecomechantypes;_cachebecomes async.Mapor a mutex-guardedmap[int]*AsyncResult. ApplyResult._jobis a global counter incremented under_cache-level locking. In Go this is anatomic.Int64.chunksizeselection (the_get_taskscall frommap_async) should be preserved verbatim:chunksize = max(1, len(iterable) // (4 * processes)). Changing it breaks throughput parity with CPython.- The daemon-thread model means pool cleanup is not guaranteed on abnormal exit. A Go port must register a finalizer or
deferequivalent to avoid goroutine leaks.
CPython 3.14 changes
Pool.starmap_asyncgained the sameerror_callbackparameter thatmap_asyncalready had (backported from a long-standing inconsistency)._handle_workersnow usesos.waitidwithWNOHANGon POSIX instead of looping overp.exitcode, reducing polling overhead.- The
maxtasksperchildpath was refactored to share the same sentinel-based shutdown asterminate(), removing a separate_wrap_exceptioncode path.