Skip to main content

Lib/multiprocessing/__init__.py

cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/__init__.py

The multiprocessing package sidesteps the GIL by running Python code in separate OS processes. Its public API is deliberately shaped after threading so that porting single-threaded or threaded code is straightforward: Process mirrors Thread, Queue mirrors queue.Queue, and the synchronisation classes (Lock, Event, Semaphore, Condition) mirror their threading counterparts.

The package is spread across several files. The key ones are:

  • process.py: BaseProcess and Process; _bootstrap entry point.
  • pool.py: Pool, ApplyResult, MapResult, worker loop.
  • queues.py: Queue, SimpleQueue, JoinableQueue.
  • managers.py: BaseManager, SyncManager, BaseProxy.
  • sharedctypes.py: Value, Array, shared memory wrappers.
  • connection.py: Connection, Pipe, authentication.
  • context.py: BaseContext, start-method dispatch, get_context.

The start method (fork, spawn, or forkserver) is chosen per-platform or explicitly via set_start_method. Each method uses a different mechanism for bootstrapping the child: fork clones the interpreter state directly; spawn launches a fresh interpreter and re-imports the main module; forkserver asks a long-lived server process to fork on demand.

Map

LinesSymbolRolegopy
process.py 1-100BaseProcess.__init__, _identity, authkeyCore process state: target, args, kwargs, daemon flag, authentication key derived from parent.(stdlib pending)
process.py 100-200BaseProcess.start, BaseProcess._bootstrapstart() calls the context's Process subclass _popen factory; _bootstrap is the child-side entry point that calls run().(stdlib pending)
process.py 200-300BaseProcess.join, terminate, kill, is_alive, exitcodeLifecycle queries and control; delegate to the _popen handle returned by the start-method factory.(stdlib pending)
pool.py 1-200Pool.__init__, worker, _handle_tasksPool initialization: spawns worker processes running worker(); task and result queues wired through SimpleQueue.(stdlib pending)
pool.py 200-400Pool.apply_async, map, starmap, imap, imap_unorderedPublic work-submission API; apply_async submits one task and returns an AsyncResult; imap yields results lazily.(stdlib pending)
queues.py 1-100Queue.__init__, put, getQueue backed by a Pipe pair plus a feeder thread; put serialises with pickle and sends over the write end.(stdlib pending)
queues.py 100-200Queue._feed, JoinableQueue, SimpleQueue_feed is the background thread that drains the internal buffer into the pipe; JoinableQueue adds task tracking via a semaphore.(stdlib pending)
managers.py 1-150BaseManager, Server, convert_to_errorBaseManager starts a server process; Server runs in that process and dispatches method calls received over authenticated Connection objects.(stdlib pending)
managers.py 150-300BaseProxy, SyncManager, registerBaseProxy wraps a remote object reference; SyncManager pre-registers Lock, Event, Semaphore, Condition, Queue, dict, list, etc.(stdlib pending)
sharedctypes.py 1-150Value, Array, RawValue, RawArrayAllocate ctypes objects in shared memory; Value/Array wrap them in a Lock-guarded proxy; RawValue/RawArray expose them unguarded.(stdlib pending)

Reading

Process._bootstrap (process.py lines 100 to 200)

cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/process.py#L100-200

def _bootstrap(self, parent_sentinel=None):
from . import util, context
global _current_process, _parent_process, _process_counter
try:
if self._start_method is not None:
context._force_start_method(self._start_method)
_process_counter = itertools.count(1)
_current_process = self
_parent_process = _ParentProcess(
self._parentname, self._parent_pid, parent_sentinel)
if sys.stdin is not None:
try:
sys.stdin.close()
sys.stdin = open(os.devnull)
except (OSError, ValueError):
pass
self._bootstrap_inner()
except Exception:
pass
finally:
util._exit_function()

_bootstrap is the first Python function that runs inside the child process after the OS-level fork or exec. It resets the _current_process global so that current_process() returns this process, and installs a _ParentProcess sentinel that can detect parent death via parent_sentinel (a connection whose read end closes when the parent exits). Closing sys.stdin prevents child processes from accidentally reading from the terminal. _bootstrap_inner calls self.run() inside a try/finally that invokes util._exit_function() to flush atexit handlers and close any resources the child inherited.

Pool worker loop (pool.py lines 1 to 200)

cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/pool.py#L1-200

def worker(inqueue, outqueue, initializer=None, initargs=(),
maxtasks=None, wrap_exception=False):
if initializer is not None:
initializer(*initargs)
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = inqueue.get()
except (EOFError, OSError):
break
if task is None:
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception as e:
if wrap_exception and func is not _helper_reraises_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
outqueue.put((job, i, result))
except Exception as e:
outqueue.put((job, i, (False, e)))
completed += 1
util.debug('worker exiting after %d tasks', completed)

Each pool worker runs this loop in its own process. Tasks arrive as (job_id, task_index, func, args, kwds) tuples on inqueue. A sentinel None value or an EOFError (parent closed the queue) signals shutdown. maxtasks caps the number of tasks per worker process; when the limit is reached the worker exits so a fresh process can replace it, preventing unbounded memory growth from accumulating Python objects. If func raises, the exception is wrapped in ExceptionWithTraceback (which pickles the traceback text) before being sent back on outqueue.

Queue pipe-based transport (queues.py lines 1 to 200)

cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/queues.py#L1-200

class Queue(object):
def __init__(self, maxsize=0, *, ctx):
self._maxsize = maxsize
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._wlock = ctx.Lock() if sys.platform != 'win32' else None
self._sem = ctx.BoundedSemaphore(maxsize) if maxsize > 0 else None
self._buffer = collections.deque()
self._notempty = threading.Condition(threading.Lock())
self._start_thread()

def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._sem is not None:
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def get(self, block=True, timeout=None):
with self._rlock:
if block:
deadline = None if timeout is None else time.monotonic() + timeout
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
return _ForkingPickler.loads(res)

Queue uses a unidirectional Pipe pair (OS pipe or socket depending on platform) as the data channel. The send side writes serialised objects from a background feeder thread (_feed) rather than directly in put, so that put can return immediately even when the pipe's OS buffer is full. The feeder thread drains self._buffer and writes each pickled object to self._writer. On the receive side, get acquires _rlock (one reader at a time) and calls the connection's recv_bytes. The _sem (a BoundedSemaphore) enforces maxsize by blocking put callers when the queue is full.

gopy mirror

multiprocessing requires OS-level fork/exec, process management, OS pipes, and ctypes shared memory, none of which exist in the current gopy runtime. The package is not ported. When gopy grows process isolation and OS-primitive support, the natural implementation strategy is to treat each Process as a separate gopy interpreter instance communicating over OS pipes, with Pool and Manager layered on top.