Skip to main content

Lib/queue.py

Source:

cpython 3.14 @ ab2d84fe1023/Lib/queue.py

Map

LinesSymbolNotes
1–20module prologueimports deque, heapq, threading, time
21–40Empty, Fullexception classes
41–260Queuecore class; deque-based storage, mutex + two Conditions
42–80Queue.__init__initializes deque, mutex, not_empty, not_full, all_tasks_done
81–130Queue.put, Queue.put_nowaitblock until space available; notify not_empty
131–185Queue.get, Queue.get_nowaitblock until item available; notify not_full
186–220Queue.task_done, Queue.joinunfinished_tasks counter; join waits on all_tasks_done
221–240Queue.qsize, Queue.empty, Queue.fullsize predicates (advisory)
241–260Queue._put, Queue._get, Queue._qsizeinternal hooks overridden by subclasses
261–280PriorityQueueoverrides _init/_put/_get with a heapq
281–295LifoQueueoverrides _init/_put/_get with a list used as a stack
296–320SimpleQueuepipe-based; no maxsize, no task_done/join

Reading

Queue internals: mutex and two Conditions

Queue.__init__ allocates a single threading.Lock called mutex and derives two Condition objects from it (not_empty, not_full) plus a third (all_tasks_done) for the join protocol. All three share the same underlying lock, so only one thread holds mutex at a time.

# CPython: Lib/queue.py:42 Queue.__init__
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0

put and get with optional timeout

Both put and get accept block and timeout parameters. When block=True and timeout is given, the caller waits on the appropriate Condition with a deadline computed from time.monotonic. The loop re-checks the predicate after each wakeup to guard against spurious notifications.

# CPython: Lib/queue.py:81 Queue.put
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.monotonic() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()

task_done and join

task_done decrements unfinished_tasks under all_tasks_done and broadcasts when the counter reaches zero. join simply waits on that condition until the counter is zero. This makes the pair suitable for producer/worker pipelines where the producer needs confirmation that all work has been consumed and processed.

# CPython: Lib/queue.py:186 Queue.task_done
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished

# CPython: Lib/queue.py:202 Queue.join
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()

gopy notes

Status: not yet ported.

Planned package path: module/queue/.

The port depends on module/threading/ for Lock and Condition. SimpleQueue can be ported independently using a channel or Go sync primitives. PriorityQueue requires a heap implementation (Go's container/heap is a natural fit). The task_done/join protocol maps cleanly to a sync.WaitGroup.