queue.py
Lib/queue.py provides three thread-safe queue classes: Queue (FIFO),
LifoQueue (stack), and PriorityQueue (heap). All three share the same
put/get locking protocol and differ only in the three storage hooks
_init, _put, and _get.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-20 | imports, __all__ | collections.deque, heapq, threading |
| 21-60 | Queue.__init__ | Sets up mutex, not_empty, not_full, all_tasks_done |
| 61-110 | Queue.put, Queue.put_nowait | Producer path with not_full wait |
| 111-160 | Queue.get, Queue.get_nowait | Consumer path with not_empty wait |
| 161-200 | Queue.task_done, Queue.join | Work-tracking counter |
| 201-240 | Queue._init, Queue._put, Queue._get | Storage hooks (deque) |
| 241-290 | LifoQueue | Overrides hooks to use a list |
| 291-340 | PriorityQueue | Overrides hooks to use heapq |
| 341-370 | SimpleQueue | Lock-free alternative backed by _thread.lock |
Reading
Queue.put and Queue.get
put() acquires the single mutex, waits on not_full if the queue is at
capacity, calls _put, then notifies not_empty. get() is the mirror
image.
# CPython: Lib/queue.py:133 Queue.put
def put(self, item, block=True, timeout=None):
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
Storage hooks
Queue keeps items in a collections.deque. _put appends to the right;
_get pops from the left, giving FIFO order.
# CPython: Lib/queue.py:238 Queue._init / _put / _get
def _init(self, maxsize):
self.queue = deque()
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.popleft()
LifoQueue swaps the deque for a list and pops from the right.
# CPython: Lib/queue.py:267 LifoQueue._init / _put / _get
def _init(self, maxsize):
self.queue = []
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
Queue.join and task_done
join() blocks until unfinished_tasks reaches zero. Each consumer calls
task_done() to decrement the counter; when it hits zero, all_tasks_done
is notified and every waiting join() call unblocks.
# CPython: Lib/queue.py:90 Queue.task_done
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError("task_done() called too many times")
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
gopy notes
mutexis async.Mutexin gopy;not_empty,not_full, andall_tasks_donearesync.Condvalues wrapping the same mutex._put/_get/_initbecome an interfacequeueStorageso thatLifoQueueandPriorityQueuecan be separate structs with no runtime type switch.SimpleQueuemaps directly to a Go channel with no capacity limit.heapqis already ported in gopy;PriorityQueuecan import it as-is.
CPython 3.14 changes
- No structural changes to
queue.pyin 3.14. The file has been stable since the 3.9SimpleQueueconsolidation. - 3.13 added
Queue.shutdown()for orderly drain-and-close; it is present in 3.14 and not yet covered by gopy.