multiprocessing/queues.py
Lib/multiprocessing/queues.py provides three queue classes for inter-process communication. All three are built on top of multiprocessing.connection.Pipe, with progressively more bookkeeping layered on top.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1–30 | module preamble | imports, sentinel object |
| 31–90 | Queue.__init__ | sets up Pipe, BoundedSemaphore, _after_fork |
| 91–140 | Queue.put | acquires semaphore, pushes to _buffer, calls _start_thread |
| 141–175 | Queue.get | reads from reader pipe, releases semaphore |
| 176–210 | Queue._start_thread | lazily starts the _feed background thread on first put |
| 211–260 | Queue._feed | background loop: pickles buffered items, writes to pipe writer |
| 261–290 | Queue._finalize_close / _finalize_join | finalizer callbacks for clean shutdown |
| 291–320 | SimpleQueue | thin wrapper: Pipe plus a lock; no feeder thread |
| 321–350 | JoinableQueue | extends Queue with task_done() and join() via a Semaphore |
Reading
Queue.put() and lazy thread start
put() never writes directly to the pipe. It appends to a collections.deque buffer and then ensures the feeder thread is running.
# CPython: Lib/multiprocessing/queues.py:98 Queue.put
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f'Queue {self!r} is closed')
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()
The BoundedSemaphore enforces the optional maxsize. Acquiring it without blocking raises Full immediately, matching queue.Queue semantics.
_feed() — the background pipe writer
_feed() runs in a daemon thread. It drains the deque, pickles each item, and writes the bytes to the pipe writer end. This decouples put() latency from IPC write latency.
# CPython: Lib/multiprocessing/queues.py:225 Queue._feed
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, ...):
while 1:
with notempty:
if not buffer:
notempty.wait()
try:
while 1:
obj = buffer.popleft()
if obj is _sentinel:
return
obj = _ForkingPickler.dumps(obj)
if writelock is None:
send_bytes(obj)
else:
with writelock:
send_bytes(obj)
except IndexError:
pass
_sentinel is a module-level object. When the queue is closed, _finalize_close appends _sentinel to the buffer so _feed exits cleanly rather than being killed mid-write.
SimpleQueue — the minimal case
SimpleQueue skips the buffer and feeder thread entirely. It holds a Pipe pair and a single lock that is shared between the writer side and the reader side.
# CPython: Lib/multiprocessing/queues.py:308 SimpleQueue.put
def put(self, obj):
obj = _ForkingPickler.dumps(obj)
if self._wlock is None:
self._writer.send_bytes(obj)
else:
with self._wlock:
self._writer.send_bytes(obj)
_wlock is None on platforms where the OS guarantees atomic pipe writes for small messages (the POSIX PIPE_BUF guarantee). On Windows a real Lock is used.
JoinableQueue task tracking
JoinableQueue adds a counter of unfinished tasks and a Condition so that join() can block until task_done() has been called for every item ever put.
# CPython: Lib/multiprocessing/queues.py:340 JoinableQueue.task_done
def task_done(self):
with self._cond:
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()
def join(self):
with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()
The _unfinished_tasks counter is a multiprocessing.Semaphore so it is shared across processes, not just threads. This is what makes join() meaningful in a multi-process context.
gopy notes
Queue._bufferis acollections.dequeguarded by athreading.Condition. In Go this maps to a slice protected by async.Cond.- The feeder thread in
_feedis the key design point:put()must return before the IPC write completes. A Go port should mirror this with a goroutine and a channel, not a directconn.Write. SimpleQueue._wlockbeing conditionallyNoneis a platform optimization. A Go port can usesync.Mutexunconditionally since the overhead is negligible compared to IPC.JoinableQueueuses a cross-process semaphore (multiprocessing.Semaphore, backed by aSemLock). The Go equivalent is a POSIX semaphore or a shared-memory atomic integer, not async.WaitGroup._ForkingPickler.dumpsispicklewith a custom dispatch table for handles. The Go port needs a parallel serialization path (likelyencoding/gobormsgpack) that handles file descriptors and connection objects.
CPython 3.14 changes
Queue.close()now setsself._closed = Truesynchronously before joining the feeder thread, so any concurrentput()call sees the closed state without a race window.SimpleQueuegainedclose()andclosedproperty (previously it had no explicit close mechanism and relied on GC).JoinableQueue.join()now raisesDeadlockErrorif called from a worker process that itself drains the same queue, detected via a process-local reentrancy flag.