Skip to main content

Modules/_multiprocessing/ (part 2)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/queues.py

This annotation covers inter-process communication and process management. See modules_multiprocessing_detail for Process.__init__, BaseManager, SyncManager, and shared memory.

Map

LinesSymbolRole
1-100Queue.put / Queue.getSend and receive objects between processes via a pipe
101-220PipeCreate a pair of Connection objects over a socket or OS pipe
221-360Pool.mapDistribute work across a pool of worker processes
361-500Process.startFork or spawn a new process
501-700Spawn contextPickle-based process bootstrap for spawn start method

Reading

Queue.put

# CPython: Lib/multiprocessing/queues.py:88 Queue.put
def put(self, obj, block=True, timeout=None):
"""Put obj into the queue. Pickles obj and sends via the feeder thread."""
if self._closed:
raise ValueError("Queue is closed")
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

Queue.put appends to an in-process buffer; a background feeder thread serializes objects with pickle and sends them through the underlying Pipe. This decouples the caller from the I/O. The semaphore limits the number of buffered items (default unbounded with Queue() vs bounded with Queue(maxsize)).

Pool.map

# CPython: Lib/multiprocessing/pool.py:367 Pool.map
def map(self, func, iterable, chunksize=None):
"""Block until all results are ready. Returns a list."""
return self._map_async(func, iterable, mapstar, chunksize).get()

def _map_async(self, func, iterable, mapper, chunksize=None):
"""Submit tasks in chunks to the worker queue."""
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((
(TASK, (result._job, i, mapper, (x,), {}))
for i, x in enumerate(task_batches)
))
return result

chunksize controls how many items are sent to each worker at once. Large chunksize reduces IPC overhead; small chunksize gives finer load balancing. The default chunksize is computed as len(iterable) / (4 * pool_size).

Process.start — spawn context

# CPython: Lib/multiprocessing/process.py:121 Process.start
def start(self):
"""Start the child process."""
self._check_closed()
self._popen = self._Popen(self)
self._sentinel = self._popen.sentinel
...

# CPython: Lib/multiprocessing/context.py:220 SpawnProcess._Popen
class SpawnProcess(BaseProcess):
_start_method = 'spawn'
@staticmethod
def _Popen(process_obj):
from .popen_spawn_posix import Popen
return Popen(process_obj)

The spawn context creates a fresh Python interpreter and sends the Process object via pickle. The child calls _bootstrap which unpickles the target function and arguments. fork is faster (copies the parent's memory) but unsafe with threads. macOS and Windows default to spawn.

Bootstrap sequence

# CPython: Lib/multiprocessing/process.py:314 _bootstrap
def _bootstrap(self, parent_sentinel=None):
"""Called in the child process. Runs self.run() and exits."""
import threading
threading._dangling.clear()
...
try:
self.run()
exitcode = 0
except Exception:
exitcode = 1
finally:
...
sys.exit(exitcode)

_bootstrap sets up logging, clears thread state left over from the parent (in fork mode), then calls self.run(). The exitcode from _bootstrap becomes the OS exit code, visible as Process.exitcode in the parent.

gopy notes

multiprocessing is partially stubbed in gopy. Queue is module/multiprocessing.Queue in module/multiprocessing/queue.go, backed by Go channels. Process.start spawns a goroutine rather than a real OS process for the in-process test harness. Pool.map uses a worker goroutine pool. Full subprocess support requires CGo or os/exec.