Lib/multiprocessing/__init__.py
cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/__init__.py
The multiprocessing package sidesteps the GIL by running Python code in
separate OS processes. Its public API is deliberately shaped after
threading so that porting single-threaded or threaded code is
straightforward: Process mirrors Thread, Queue mirrors
queue.Queue, and the synchronisation classes (Lock, Event,
Semaphore, Condition) mirror their threading counterparts.
The package is spread across several files. The key ones are:
process.py:BaseProcessandProcess;_bootstrapentry point.pool.py:Pool,ApplyResult,MapResult, worker loop.queues.py:Queue,SimpleQueue,JoinableQueue.managers.py:BaseManager,SyncManager,BaseProxy.sharedctypes.py:Value,Array, shared memory wrappers.connection.py:Connection,Pipe, authentication.context.py:BaseContext, start-method dispatch,get_context.
The start method (fork, spawn, or forkserver) is chosen per-platform
or explicitly via set_start_method. Each method uses a different
mechanism for bootstrapping the child: fork clones the interpreter
state directly; spawn launches a fresh interpreter and re-imports the
main module; forkserver asks a long-lived server process to fork on
demand.
Map
| Lines | Symbol | Role | gopy |
|---|---|---|---|
| process.py 1-100 | BaseProcess.__init__, _identity, authkey | Core process state: target, args, kwargs, daemon flag, authentication key derived from parent. | (stdlib pending) |
| process.py 100-200 | BaseProcess.start, BaseProcess._bootstrap | start() calls the context's Process subclass _popen factory; _bootstrap is the child-side entry point that calls run(). | (stdlib pending) |
| process.py 200-300 | BaseProcess.join, terminate, kill, is_alive, exitcode | Lifecycle queries and control; delegate to the _popen handle returned by the start-method factory. | (stdlib pending) |
| pool.py 1-200 | Pool.__init__, worker, _handle_tasks | Pool initialization: spawns worker processes running worker(); task and result queues wired through SimpleQueue. | (stdlib pending) |
| pool.py 200-400 | Pool.apply_async, map, starmap, imap, imap_unordered | Public work-submission API; apply_async submits one task and returns an AsyncResult; imap yields results lazily. | (stdlib pending) |
| queues.py 1-100 | Queue.__init__, put, get | Queue backed by a Pipe pair plus a feeder thread; put serialises with pickle and sends over the write end. | (stdlib pending) |
| queues.py 100-200 | Queue._feed, JoinableQueue, SimpleQueue | _feed is the background thread that drains the internal buffer into the pipe; JoinableQueue adds task tracking via a semaphore. | (stdlib pending) |
| managers.py 1-150 | BaseManager, Server, convert_to_error | BaseManager starts a server process; Server runs in that process and dispatches method calls received over authenticated Connection objects. | (stdlib pending) |
| managers.py 150-300 | BaseProxy, SyncManager, register | BaseProxy wraps a remote object reference; SyncManager pre-registers Lock, Event, Semaphore, Condition, Queue, dict, list, etc. | (stdlib pending) |
| sharedctypes.py 1-150 | Value, Array, RawValue, RawArray | Allocate ctypes objects in shared memory; Value/Array wrap them in a Lock-guarded proxy; RawValue/RawArray expose them unguarded. | (stdlib pending) |
Reading
Process._bootstrap (process.py lines 100 to 200)
cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/process.py#L100-200
def _bootstrap(self, parent_sentinel=None):
from . import util, context
global _current_process, _parent_process, _process_counter
try:
if self._start_method is not None:
context._force_start_method(self._start_method)
_process_counter = itertools.count(1)
_current_process = self
_parent_process = _ParentProcess(
self._parentname, self._parent_pid, parent_sentinel)
if sys.stdin is not None:
try:
sys.stdin.close()
sys.stdin = open(os.devnull)
except (OSError, ValueError):
pass
self._bootstrap_inner()
except Exception:
pass
finally:
util._exit_function()
_bootstrap is the first Python function that runs inside the child
process after the OS-level fork or exec. It resets the _current_process
global so that current_process() returns this process, and installs a
_ParentProcess sentinel that can detect parent death via parent_sentinel
(a connection whose read end closes when the parent exits). Closing
sys.stdin prevents child processes from accidentally reading from the
terminal. _bootstrap_inner calls self.run() inside a try/finally that
invokes util._exit_function() to flush atexit handlers and close any
resources the child inherited.
Pool worker loop (pool.py lines 1 to 200)
cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/pool.py#L1-200
def worker(inqueue, outqueue, initializer=None, initargs=(),
maxtasks=None, wrap_exception=False):
if initializer is not None:
initializer(*initargs)
completed = 0
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = inqueue.get()
except (EOFError, OSError):
break
if task is None:
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds))
except Exception as e:
if wrap_exception and func is not _helper_reraises_exception:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
try:
outqueue.put((job, i, result))
except Exception as e:
outqueue.put((job, i, (False, e)))
completed += 1
util.debug('worker exiting after %d tasks', completed)
Each pool worker runs this loop in its own process. Tasks arrive as
(job_id, task_index, func, args, kwds) tuples on inqueue. A sentinel
None value or an EOFError (parent closed the queue) signals shutdown.
maxtasks caps the number of tasks per worker process; when the limit is
reached the worker exits so a fresh process can replace it, preventing
unbounded memory growth from accumulating Python objects. If func raises,
the exception is wrapped in ExceptionWithTraceback (which pickles the
traceback text) before being sent back on outqueue.
Queue pipe-based transport (queues.py lines 1 to 200)
cpython 3.14 @ ab2d84fe1023/Lib/multiprocessing/queues.py#L1-200
class Queue(object):
def __init__(self, maxsize=0, *, ctx):
self._maxsize = maxsize
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._wlock = ctx.Lock() if sys.platform != 'win32' else None
self._sem = ctx.BoundedSemaphore(maxsize) if maxsize > 0 else None
self._buffer = collections.deque()
self._notempty = threading.Condition(threading.Lock())
self._start_thread()
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._sem is not None:
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
def get(self, block=True, timeout=None):
with self._rlock:
if block:
deadline = None if timeout is None else time.monotonic() + timeout
if not self._poll(timeout):
raise Empty
elif not self._poll():
raise Empty
res = self._recv_bytes()
return _ForkingPickler.loads(res)
Queue uses a unidirectional Pipe pair (OS pipe or socket depending on
platform) as the data channel. The send side writes serialised objects from
a background feeder thread (_feed) rather than directly in put, so
that put can return immediately even when the pipe's OS buffer is full.
The feeder thread drains self._buffer and writes each pickled object to
self._writer. On the receive side, get acquires _rlock (one reader at
a time) and calls the connection's recv_bytes. The _sem (a
BoundedSemaphore) enforces maxsize by blocking put callers when the
queue is full.
gopy mirror
multiprocessing requires OS-level fork/exec, process management, OS
pipes, and ctypes shared memory, none of which exist in the current gopy
runtime. The package is not ported. When gopy grows process isolation and
OS-primitive support, the natural implementation strategy is to treat each
Process as a separate gopy interpreter instance communicating over OS
pipes, with Pool and Manager layered on top.