Skip to main content

Lib/multiprocessing/ (part 3)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/pool.py

This annotation covers the Pool task distribution. See modules_multiprocessing2_detail for Pool.__init__, Process, and the SimpleQueue / Queue primitives.

Map

LinesSymbolRole
1-80Pool.mapBlocking map: split iterable, dispatch, collect results
81-180Pool.map_asyncNon-blocking variant returning AsyncResult
181-280Pool.starmapLike map but unpacks arguments (*args)
281-380worker functionPer-process function: read task queue, call function, write result
381-600AsyncResult / MapResultFuture-like objects for async pool operations

Reading

Pool.map

# CPython: Lib/multiprocessing/pool.py:366 Pool.map
def map(self, func, iterable, chunksize=None):
"""Apply func to each element, collecting results in a list."""
return self._map_async(func, iterable, mapstar, chunksize).get()

Pool.map is synchronous: it blocks until all workers finish. It internally calls map_async and immediately calls .get() on the result. The chunksize controls how many items are sent to each worker per task; larger chunks reduce IPC overhead for cheap functions.

Pool._map_async

# CPython: Lib/multiprocessing/pool.py:380 Pool._map_async
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
if not self._state == RUN:
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result

The chunksize heuristic targets 4 chunks per worker. Tasks are batched to amortize pickling and IPC overhead. mapper is mapstar for starmap (unpacks args) or map for regular map.

worker function

# CPython: Lib/multiprocessing/pool.py:124 worker
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False):
"""Worker process loop."""
if initializer is not None:
initializer(*initargs)
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = inqueue.get()
except (EOFError, OSError):
break
if task is None:
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception as e:
if wrap_exception and func is not _helper_reraises_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
outqueue.put((job, i, result))
except Exception as e:
...
completed += 1

The worker loop is simple: get task, call function, put result. maxtasks limits the number of tasks before the worker exits and is replaced (avoids memory leak from long-running workers processing many tasks). wrap_exception pickles the traceback for delivery to the parent process.

AsyncResult

# CPython: Lib/multiprocessing/pool.py:760 AsyncResult
class AsyncResult:
def __init__(self, cache, callback, error_callback):
self._event = threading.Event()
self._job = next(job_counter)
self._cache = cache
cache[self._job] = self

def get(self, timeout=None):
self._event.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
raise self._value # exception

def _set(self, i, obj):
self._success, self._value = obj
self._event.set()
del self._cache[self._job]
if self._callback and self._success:
self._callback(self._value)

AsyncResult is a future backed by a threading.Event. The result-handling thread calls _set when the worker's output arrives; get() blocks on the event. The cache dict keeps the result alive until retrieved.

gopy notes

Pool.map is module/multiprocessing.PoolMap in module/multiprocessing/module.go. Workers are Go goroutines communicating via channels instead of OS processes. AsyncResult is module/multiprocessing.AsyncResult backed by a Go channel. worker serializes tasks using encoding/gob or objects.Marshal.