Lib/queue.py
Source:
cpython 3.14 @ ab2d84fe1023/Lib/queue.py
queue provides four thread-safe queue types for producer-consumer patterns. All blocking operations use a threading.Condition internally. The module is pure Python with no C extension.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-25 | imports, __all__ | threading, collections.deque, heapq |
| 26-220 | Queue | FIFO queue; core implementation all others inherit |
| 221-245 | LifoQueue | Stack (LIFO) backed by a deque |
| 246-270 | PriorityQueue | Min-heap backed queue |
| 271-310 | SimpleQueue | Lock-free unbounded queue using a deque and a Semaphore |
Reading
Queue internals
Queue uses three threading primitives: a Condition (mutex) that wraps a Lock, a not_empty condition signalled on put, and a not_full condition signalled on get. The payload is stored in a deque.
# CPython: Lib/queue.py:35 Queue.__init__
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0
put and get
put acquires not_full, waits while the queue is at capacity, appends to the deque, increments unfinished_tasks, and notifies not_empty.
# CPython: Lib/queue.py:116 Queue.put
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
... # wait loop with optional timeout
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
get mirrors put: waits on not_empty, removes from the deque, and notifies not_full.
task_done and join
task_done decrements unfinished_tasks and calls all_tasks_done.notify_all() when it reaches zero. join blocks on all_tasks_done until unfinished_tasks is zero. This allows a producer to wait until all enqueued items have been consumed and processed.
# CPython: Lib/queue.py:71 Queue.task_done
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
LifoQueue and PriorityQueue
Both subclasses override only _init, _put, _get, and _qsize. LifoQueue uses a list with append/pop. PriorityQueue uses heapq.heappush/heappop on a list.
# CPython: Lib/queue.py:228 LifoQueue._init
def _init(self, maxsize):
self.queue = []
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
SimpleQueue
SimpleQueue avoids the full Condition machinery. It uses a deque protected by a Lock for the put side and a Semaphore that the get side acquires.
# CPython: Lib/queue.py:275 SimpleQueue.__init__
def __init__(self):
self._queue = collections.deque()
self._count = threading.Semaphore(0)
gopy notes
Status: not yet ported. The Go analogue is a chan interface{} with a separate goroutine-safe wrapper. An exact port would use sync.Mutex and sync.Cond to mirror the Python threading.Condition semantics, including task_done/join. SimpleQueue maps to a channel or a sync.Mutex-protected deque.