Lib/multiprocessing/ (part 3)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/pool.py
This annotation covers the Pool task distribution. See modules_multiprocessing2_detail for Pool.__init__, Process, and the SimpleQueue / Queue primitives.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | Pool.map | Blocking map: split iterable, dispatch, collect results |
| 81-180 | Pool.map_async | Non-blocking variant returning AsyncResult |
| 181-280 | Pool.starmap | Like map but unpacks arguments (*args) |
| 281-380 | worker function | Per-process function: read task queue, call function, write result |
| 381-600 | AsyncResult / MapResult | Future-like objects for async pool operations |
Reading
Pool.map
# CPython: Lib/multiprocessing/pool.py:366 Pool.map
def map(self, func, iterable, chunksize=None):
"""Apply func to each element, collecting results in a list."""
return self._map_async(func, iterable, mapstar, chunksize).get()
Pool.map is synchronous: it blocks until all workers finish. It internally calls map_async and immediately calls .get() on the result. The chunksize controls how many items are sent to each worker per task; larger chunks reduce IPC overhead for cheap functions.
Pool._map_async
# CPython: Lib/multiprocessing/pool.py:380 Pool._map_async
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
error_callback=None):
if not self._state == RUN:
raise ValueError("Pool not running")
if not hasattr(iterable, '__len__'):
iterable = list(iterable)
if chunksize is None:
chunksize, extra = divmod(len(iterable), len(self._pool) * 4)
if extra:
chunksize += 1
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback,
error_callback=error_callback)
self._taskqueue.put((((result._job, i, mapper, (x,), {})
for i, x in enumerate(task_batches)), None))
return result
The chunksize heuristic targets 4 chunks per worker. Tasks are batched to amortize pickling and IPC overhead. mapper is mapstar for starmap (unpacks args) or map for regular map.
worker function
# CPython: Lib/multiprocessing/pool.py:124 worker
def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
wrap_exception=False):
"""Worker process loop."""
if initializer is not None:
initializer(*initargs)
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = inqueue.get()
except (EOFError, OSError):
break
if task is None:
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception as e:
if wrap_exception and func is not _helper_reraises_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
outqueue.put((job, i, result))
except Exception as e:
...
completed += 1
The worker loop is simple: get task, call function, put result. maxtasks limits the number of tasks before the worker exits and is replaced (avoids memory leak from long-running workers processing many tasks). wrap_exception pickles the traceback for delivery to the parent process.
AsyncResult
# CPython: Lib/multiprocessing/pool.py:760 AsyncResult
class AsyncResult:
def __init__(self, cache, callback, error_callback):
self._event = threading.Event()
self._job = next(job_counter)
self._cache = cache
cache[self._job] = self
def get(self, timeout=None):
self._event.wait(timeout)
if not self._ready:
raise TimeoutError
if self._success:
return self._value
raise self._value # exception
def _set(self, i, obj):
self._success, self._value = obj
self._event.set()
del self._cache[self._job]
if self._callback and self._success:
self._callback(self._value)
AsyncResult is a future backed by a threading.Event. The result-handling thread calls _set when the worker's output arrives; get() blocks on the event. The cache dict keeps the result alive until retrieved.
gopy notes
Pool.map is module/multiprocessing.PoolMap in module/multiprocessing/module.go. Workers are Go goroutines communicating via channels instead of OS processes. AsyncResult is module/multiprocessing.AsyncResult backed by a Go channel. worker serializes tasks using encoding/gob or objects.Marshal.