Skip to main content

multiprocessing/queues.py

Lib/multiprocessing/queues.py provides three queue classes for inter-process communication. All three are built on top of multiprocessing.connection.Pipe, with progressively more bookkeeping layered on top.

Map

LinesSymbolRole
1–30module preambleimports, sentinel object
31–90Queue.__init__sets up Pipe, BoundedSemaphore, _after_fork
91–140Queue.putacquires semaphore, pushes to _buffer, calls _start_thread
141–175Queue.getreads from reader pipe, releases semaphore
176–210Queue._start_threadlazily starts the _feed background thread on first put
211–260Queue._feedbackground loop: pickles buffered items, writes to pipe writer
261–290Queue._finalize_close / _finalize_joinfinalizer callbacks for clean shutdown
291–320SimpleQueuethin wrapper: Pipe plus a lock; no feeder thread
321–350JoinableQueueextends Queue with task_done() and join() via a Semaphore

Reading

Queue.put() and lazy thread start

put() never writes directly to the pipe. It appends to a collections.deque buffer and then ensures the feeder thread is running.

# CPython: Lib/multiprocessing/queues.py:98 Queue.put
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f'Queue {self!r} is closed')
if not self._sem.acquire(block, timeout):
raise Full
with self._notempty:
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

The BoundedSemaphore enforces the optional maxsize. Acquiring it without blocking raises Full immediately, matching queue.Queue semantics.

_feed() — the background pipe writer

_feed() runs in a daemon thread. It drains the deque, pickles each item, and writes the bytes to the pipe writer end. This decouples put() latency from IPC write latency.

# CPython: Lib/multiprocessing/queues.py:225 Queue._feed
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, ...):
while 1:
with notempty:
if not buffer:
notempty.wait()
try:
while 1:
obj = buffer.popleft()
if obj is _sentinel:
return
obj = _ForkingPickler.dumps(obj)
if writelock is None:
send_bytes(obj)
else:
with writelock:
send_bytes(obj)
except IndexError:
pass

_sentinel is a module-level object. When the queue is closed, _finalize_close appends _sentinel to the buffer so _feed exits cleanly rather than being killed mid-write.

SimpleQueue — the minimal case

SimpleQueue skips the buffer and feeder thread entirely. It holds a Pipe pair and a single lock that is shared between the writer side and the reader side.

# CPython: Lib/multiprocessing/queues.py:308 SimpleQueue.put
def put(self, obj):
obj = _ForkingPickler.dumps(obj)
if self._wlock is None:
self._writer.send_bytes(obj)
else:
with self._wlock:
self._writer.send_bytes(obj)

_wlock is None on platforms where the OS guarantees atomic pipe writes for small messages (the POSIX PIPE_BUF guarantee). On Windows a real Lock is used.

JoinableQueue task tracking

JoinableQueue adds a counter of unfinished tasks and a Condition so that join() can block until task_done() has been called for every item ever put.

# CPython: Lib/multiprocessing/queues.py:340 JoinableQueue.task_done
def task_done(self):
with self._cond:
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()

def join(self):
with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

The _unfinished_tasks counter is a multiprocessing.Semaphore so it is shared across processes, not just threads. This is what makes join() meaningful in a multi-process context.

gopy notes

  • Queue._buffer is a collections.deque guarded by a threading.Condition. In Go this maps to a slice protected by a sync.Cond.
  • The feeder thread in _feed is the key design point: put() must return before the IPC write completes. A Go port should mirror this with a goroutine and a channel, not a direct conn.Write.
  • SimpleQueue._wlock being conditionally None is a platform optimization. A Go port can use sync.Mutex unconditionally since the overhead is negligible compared to IPC.
  • JoinableQueue uses a cross-process semaphore (multiprocessing.Semaphore, backed by a SemLock). The Go equivalent is a POSIX semaphore or a shared-memory atomic integer, not a sync.WaitGroup.
  • _ForkingPickler.dumps is pickle with a custom dispatch table for handles. The Go port needs a parallel serialization path (likely encoding/gob or msgpack) that handles file descriptors and connection objects.

CPython 3.14 changes

  • Queue.close() now sets self._closed = True synchronously before joining the feeder thread, so any concurrent put() call sees the closed state without a race window.
  • SimpleQueue gained close() and closed property (previously it had no explicit close mechanism and relied on GC).
  • JoinableQueue.join() now raises DeadlockError if called from a worker process that itself drains the same queue, detected via a process-local reentrancy flag.