Skip to main content

queue.py

Lib/queue.py provides three thread-safe queue classes: Queue (FIFO), LifoQueue (stack), and PriorityQueue (heap). All three share the same put/get locking protocol and differ only in the three storage hooks _init, _put, and _get.

Map

LinesSymbolRole
1-20imports, __all__collections.deque, heapq, threading
21-60Queue.__init__Sets up mutex, not_empty, not_full, all_tasks_done
61-110Queue.put, Queue.put_nowaitProducer path with not_full wait
111-160Queue.get, Queue.get_nowaitConsumer path with not_empty wait
161-200Queue.task_done, Queue.joinWork-tracking counter
201-240Queue._init, Queue._put, Queue._getStorage hooks (deque)
241-290LifoQueueOverrides hooks to use a list
291-340PriorityQueueOverrides hooks to use heapq
341-370SimpleQueueLock-free alternative backed by _thread.lock

Reading

Queue.put and Queue.get

put() acquires the single mutex, waits on not_full if the queue is at capacity, calls _put, then notifies not_empty. get() is the mirror image.

# CPython: Lib/queue.py:133 Queue.put
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()

Storage hooks

Queue keeps items in a collections.deque. _put appends to the right; _get pops from the left, giving FIFO order.

# CPython: Lib/queue.py:238 Queue._init / _put / _get
def _init(self, maxsize):
self.queue = deque()

def _put(self, item):
self.queue.append(item)

def _get(self):
return self.queue.popleft()

LifoQueue swaps the deque for a list and pops from the right.

# CPython: Lib/queue.py:267 LifoQueue._init / _put / _get
def _init(self, maxsize):
self.queue = []

def _put(self, item):
self.queue.append(item)

def _get(self):
return self.queue.pop()

Queue.join and task_done

join() blocks until unfinished_tasks reaches zero. Each consumer calls task_done() to decrement the counter; when it hits zero, all_tasks_done is notified and every waiting join() call unblocks.

# CPython: Lib/queue.py:90 Queue.task_done
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError("task_done() called too many times")
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished

def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()

gopy notes

  • mutex is a sync.Mutex in gopy; not_empty, not_full, and all_tasks_done are sync.Cond values wrapping the same mutex.
  • _put/_get/_init become an interface queueStorage so that LifoQueue and PriorityQueue can be separate structs with no runtime type switch.
  • SimpleQueue maps directly to a Go channel with no capacity limit.
  • heapq is already ported in gopy; PriorityQueue can import it as-is.

CPython 3.14 changes

  • No structural changes to queue.py in 3.14. The file has been stable since the 3.9 SimpleQueue consolidation.
  • 3.13 added Queue.shutdown() for orderly drain-and-close; it is present in 3.14 and not yet covered by gopy.