Skip to main content

Lib/queue.py

Source:

cpython 3.14 @ ab2d84fe1023/Lib/queue.py

queue provides four thread-safe queue types for producer-consumer patterns. All blocking operations use a threading.Condition internally. The module is pure Python with no C extension.

Map

LinesSymbolRole
1-25imports, __all__threading, collections.deque, heapq
26-220QueueFIFO queue; core implementation all others inherit
221-245LifoQueueStack (LIFO) backed by a deque
246-270PriorityQueueMin-heap backed queue
271-310SimpleQueueLock-free unbounded queue using a deque and a Semaphore

Reading

Queue internals

Queue uses three threading primitives: a Condition (mutex) that wraps a Lock, a not_empty condition signalled on put, and a not_full condition signalled on get. The payload is stored in a deque.

# CPython: Lib/queue.py:35 Queue.__init__
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0

put and get

put acquires not_full, waits while the queue is at capacity, appends to the deque, increments unfinished_tasks, and notifies not_empty.

# CPython: Lib/queue.py:116 Queue.put
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
... # wait loop with optional timeout
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()

get mirrors put: waits on not_empty, removes from the deque, and notifies not_full.

task_done and join

task_done decrements unfinished_tasks and calls all_tasks_done.notify_all() when it reaches zero. join blocks on all_tasks_done until unfinished_tasks is zero. This allows a producer to wait until all enqueued items have been consumed and processed.

# CPython: Lib/queue.py:71 Queue.task_done
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished

LifoQueue and PriorityQueue

Both subclasses override only _init, _put, _get, and _qsize. LifoQueue uses a list with append/pop. PriorityQueue uses heapq.heappush/heappop on a list.

# CPython: Lib/queue.py:228 LifoQueue._init
def _init(self, maxsize):
self.queue = []

def _put(self, item):
self.queue.append(item)

def _get(self):
return self.queue.pop()

SimpleQueue

SimpleQueue avoids the full Condition machinery. It uses a deque protected by a Lock for the put side and a Semaphore that the get side acquires.

# CPython: Lib/queue.py:275 SimpleQueue.__init__
def __init__(self):
self._queue = collections.deque()
self._count = threading.Semaphore(0)

gopy notes

Status: not yet ported. The Go analogue is a chan interface{} with a separate goroutine-safe wrapper. An exact port would use sync.Mutex and sync.Cond to mirror the Python threading.Condition semantics, including task_done/join. SimpleQueue maps to a channel or a sync.Mutex-protected deque.