Lib/queue.py (part 3)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/queue.py
This annotation covers the blocking put/get protocol and subclasses. See lib_queue2_detail for Queue.__init__, qsize, empty, full, and internal deque management.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | Queue.put | Add an item; block if full |
| 81-160 | Queue.get | Remove and return an item; block if empty |
| 161-240 | Queue.join / task_done | Wait for all items to be processed |
| 241-360 | LifoQueue | Stack-ordered queue |
| 361-500 | PriorityQueue | Heap-ordered priority queue |
Reading
Queue.put
# CPython: Lib/queue.py:130 put
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = monotonic() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - monotonic()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
Queue.put holds not_full (a Condition) while checking capacity. After inserting, it notifies any blocked get caller. With block=False, it raises Full immediately rather than waiting.
Queue.get
# CPython: Lib/queue.py:180 get
def get(self, block=True, timeout=None):
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
else:
endtime = monotonic() + timeout
while not self._qsize():
remaining = endtime - monotonic()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
Mirror image of put: holds not_empty, waits until something is available, then notifies not_full. The three-way branch (no block / no timeout / timed wait) is repeated symmetrically in both put and get.
Queue.join / task_done
# CPython: Lib/queue.py:84 task_done
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
# CPython: Lib/queue.py:108 join
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
task_done decrements unfinished_tasks; when it reaches zero, all waiters on all_tasks_done are notified. join blocks until unfinished_tasks == 0. This is the producer/consumer join pattern: put items → get/process → call task_done → join returns.
LifoQueue
# CPython: Lib/queue.py:220 LifoQueue
class LifoQueue(Queue):
def _init(self, maxsize):
self.queue = []
def _qsize(self):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop() # LIFO: pop from end
LifoQueue replaces the internal deque with a list and overrides the four internal hooks. It inherits all the blocking/timeout logic from Queue unchanged. list.pop() is O(1) from the end.
PriorityQueue
# CPython: Lib/queue.py:250 PriorityQueue
class PriorityQueue(Queue):
def _init(self, maxsize):
self.queue = []
def _qsize(self):
return len(self.queue)
def _put(self, item):
heappush(self.queue, item)
def _get(self):
return heappop(self.queue)
Items must be comparable. heappush/heappop maintain the min-heap invariant so the smallest item is always returned first. For custom priorities, use (priority, item) tuples.
gopy notes
Queue.put / Queue.get are in module/queue/module.go. The condition variables map to sync.Cond. LifoQueue._get calls objects.ListPop. PriorityQueue uses container/heap from Go's standard library.