Lib/queue.py
Source:
cpython 3.14 @ ab2d84fe1023/Lib/queue.py
Map
| Lines | Symbol | Notes |
|---|---|---|
| 1–20 | module prologue | imports deque, heapq, threading, time |
| 21–40 | Empty, Full | exception classes |
| 41–260 | Queue | core class; deque-based storage, mutex + two Conditions |
| 42–80 | Queue.__init__ | initializes deque, mutex, not_empty, not_full, all_tasks_done |
| 81–130 | Queue.put, Queue.put_nowait | block until space available; notify not_empty |
| 131–185 | Queue.get, Queue.get_nowait | block until item available; notify not_full |
| 186–220 | Queue.task_done, Queue.join | unfinished_tasks counter; join waits on all_tasks_done |
| 221–240 | Queue.qsize, Queue.empty, Queue.full | size predicates (advisory) |
| 241–260 | Queue._put, Queue._get, Queue._qsize | internal hooks overridden by subclasses |
| 261–280 | PriorityQueue | overrides _init/_put/_get with a heapq |
| 281–295 | LifoQueue | overrides _init/_put/_get with a list used as a stack |
| 296–320 | SimpleQueue | pipe-based; no maxsize, no task_done/join |
Reading
Queue internals: mutex and two Conditions
Queue.__init__ allocates a single threading.Lock called mutex and derives
two Condition objects from it (not_empty, not_full) plus a third
(all_tasks_done) for the join protocol. All three share the same underlying
lock, so only one thread holds mutex at a time.
# CPython: Lib/queue.py:42 Queue.__init__
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
put and get with optional timeout
Both put and get accept block and timeout parameters. When block=True
and timeout is given, the caller waits on the appropriate Condition with a
deadline computed from time.monotonic. The loop re-checks the predicate after
each wakeup to guard against spurious notifications.
# CPython: Lib/queue.py:81 Queue.put
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time.monotonic() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time.monotonic()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
task_done and join
task_done decrements unfinished_tasks under all_tasks_done and broadcasts
when the counter reaches zero. join simply waits on that condition until the
counter is zero. This makes the pair suitable for producer/worker pipelines where
the producer needs confirmation that all work has been consumed and processed.
# CPython: Lib/queue.py:186 Queue.task_done
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
# CPython: Lib/queue.py:202 Queue.join
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
gopy notes
Status: not yet ported.
Planned package path: module/queue/.
The port depends on module/threading/ for Lock and Condition. SimpleQueue
can be ported independently using a channel or Go sync primitives.
PriorityQueue requires a heap implementation (Go's container/heap is a natural
fit). The task_done/join protocol maps cleanly to a sync.WaitGroup.