Skip to main content

Lib/queue.py (part 3)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/queue.py

This annotation covers the blocking put/get protocol and subclasses. See lib_queue2_detail for Queue.__init__, qsize, empty, full, and internal deque management.

Map

LinesSymbolRole
1-80Queue.putAdd an item; block if full
81-160Queue.getRemove and return an item; block if empty
161-240Queue.join / task_doneWait for all items to be processed
241-360LifoQueueStack-ordered queue
361-500PriorityQueueHeap-ordered priority queue

Reading

Queue.put

# CPython: Lib/queue.py:130 put
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = monotonic() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - monotonic()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()

Queue.put holds not_full (a Condition) while checking capacity. After inserting, it notifies any blocked get caller. With block=False, it raises Full immediately rather than waiting.

Queue.get

# CPython: Lib/queue.py:180 get
def get(self, block=True, timeout=None):
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
else:
endtime = monotonic() + timeout
while not self._qsize():
remaining = endtime - monotonic()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item

Mirror image of put: holds not_empty, waits until something is available, then notifies not_full. The three-way branch (no block / no timeout / timed wait) is repeated symmetrically in both put and get.

Queue.join / task_done

# CPython: Lib/queue.py:84 task_done
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished

# CPython: Lib/queue.py:108 join
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()

task_done decrements unfinished_tasks; when it reaches zero, all waiters on all_tasks_done are notified. join blocks until unfinished_tasks == 0. This is the producer/consumer join pattern: put items → get/process → call task_donejoin returns.

LifoQueue

# CPython: Lib/queue.py:220 LifoQueue
class LifoQueue(Queue):
def _init(self, maxsize):
self.queue = []

def _qsize(self):
return len(self.queue)

def _put(self, item):
self.queue.append(item)

def _get(self):
return self.queue.pop() # LIFO: pop from end

LifoQueue replaces the internal deque with a list and overrides the four internal hooks. It inherits all the blocking/timeout logic from Queue unchanged. list.pop() is O(1) from the end.

PriorityQueue

# CPython: Lib/queue.py:250 PriorityQueue
class PriorityQueue(Queue):
def _init(self, maxsize):
self.queue = []

def _qsize(self):
return len(self.queue)

def _put(self, item):
heappush(self.queue, item)

def _get(self):
return heappop(self.queue)

Items must be comparable. heappush/heappop maintain the min-heap invariant so the smallest item is always returned first. For custom priorities, use (priority, item) tuples.

gopy notes

Queue.put / Queue.get are in module/queue/module.go. The condition variables map to sync.Cond. LifoQueue._get calls objects.ListPop. PriorityQueue uses container/heap from Go's standard library.