Lib/asyncio/queues.py
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py
This file provides three async-aware queue classes: Queue (FIFO), PriorityQueue (min-heap), and LifoQueue (stack). Each uses Future objects stored in two deques, _getters and _putters, to suspend producers when the queue is full and consumers when it is empty. The API mirrors queue.Queue from the standard library, but all blocking operations are coroutines. A shutdown method added in 3.13 and carried into 3.14 allows the queue to be drained and closed, unblocking all waiters with QueueShutDown.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-8 | __all__ | Exported names |
| 10-15 | Imports | collections, heapq, GenericAlias, locks, mixins |
| 18-19 | QueueEmpty | Raised by get_nowait on an empty queue |
| 23-24 | QueueFull | Raised by put_nowait on a full queue |
| 28-29 | QueueShutDown | Raised on put/get after shutdown() |
| 33-278 | Queue | Full FIFO implementation |
| 45-56 | Queue.__init__ | State setup: _getters, _putters, _finished, _is_shutdown |
| 60-68 | _init, _get, _put | Overridable storage primitives |
| 71-77 | _wakeup_next | Resolves the oldest non-cancelled waiter in a deque |
| 79-99 | __repr__, __str__, _format | Debug representations |
| 101-123 | qsize, maxsize, empty, full | Capacity introspection |
| 125-154 | put | Coroutine: suspends when full, then calls put_nowait |
| 156-170 | put_nowait | Non-blocking put; raises QueueFull or QueueShutDown |
| 172-201 | get | Coroutine: suspends when empty, then calls get_nowait |
| 203-217 | get_nowait | Non-blocking get; raises QueueEmpty or QueueShutDown |
| 219-237 | task_done | Decrements unfinished-task counter; sets _finished at zero |
| 239-248 | join | Awaits _finished.wait() until all tasks are done |
| 250-278 | shutdown | Drains and closes the queue; unblocks all waiters |
| 281-295 | PriorityQueue | Subclass using heapq for _get and _put |
| 297-307 | LifoQueue | Subclass using list.pop for _get |
Reading
Queue.put and the cancellation triangle
put (line 125) suspends the caller by creating a Future, appending it to _putters, and awaiting it. The except block handles the case where the coroutine is cancelled mid-wait: it cancels the future, removes it from _putters, and if the queue turned out to have room (because a consumer ran while the putter was suspended), it wakes the next putter before re-raising.
# CPython: Lib/asyncio/queues.py:125 Queue.put
async def put(self, item):
while self.full():
if self._is_shutdown:
raise QueueShutDown
putter = self._get_loop().create_future()
self._putters.append(putter)
try:
await putter
except:
putter.cancel()
try:
self._putters.remove(putter)
except ValueError:
# already removed by get_nowait or shutdown
pass
if not self.full() and not putter.cancelled():
self._wakeup_next(self._putters)
raise
return self.put_nowait(item)
The while self.full() loop is necessary rather than if self.full(): after the await resumes, another putter or consumer may have changed the queue state, so the condition must be rechecked.
Queue.get mirrors put
get (line 172) is symmetric. It suspends by appending a Future to _getters and awaiting it. put_nowait resolves the oldest getter via _wakeup_next. The same cancellation handling applies: if the future is cancelled the getter is removed and, if the queue is now non-empty and the cancel was not due to this coroutine, the next getter is woken.
# CPython: Lib/asyncio/queues.py:172 Queue.get
async def get(self):
while self.empty():
if self._is_shutdown and self.empty():
raise QueueShutDown
getter = self._get_loop().create_future()
self._getters.append(getter)
try:
await getter
except:
getter.cancel()
try:
self._getters.remove(getter)
except ValueError:
pass
if not self.empty() and not getter.cancelled():
self._wakeup_next(self._getters)
raise
return self.get_nowait()
task_done, join, and the _finished event
Queue tracks the number of items that have been fetched but not yet fully processed via _unfinished_tasks. Every put_nowait increments it; every task_done decrements it. When the counter reaches zero, _finished (a locks.Event) is set, unblocking all callers of join.
# CPython: Lib/asyncio/queues.py:219 Queue.task_done
def task_done(self):
if self._unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._finished.set()
# CPython: Lib/asyncio/queues.py:239 Queue.join
async def join(self):
if self._unfinished_tasks > 0:
await self._finished.wait()
_finished starts set (Event.set() is called in __init__) so join on a never-used queue returns immediately. put_nowait calls _finished.clear() on the first enqueue.
Queue.shutdown and cooperative teardown
shutdown (line 250) sets _is_shutdown = True, optionally drains remaining items (when immediate=True), and then resolves every pending getter and putter so they can observe the QueueShutDown flag on their next iteration.
# CPython: Lib/asyncio/queues.py:250 Queue.shutdown
def shutdown(self, immediate=False):
self._is_shutdown = True
if immediate:
while not self.empty():
self._get()
if self._unfinished_tasks > 0:
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._finished.set()
while self._getters:
getter = self._getters.popleft()
if not getter.done():
getter.set_result(None)
while self._putters:
putter = self._putters.popleft()
if not putter.done():
putter.set_result(None)
Getters are woken by setting their result to None, not by an exception. They then re-enter the while self.empty() check and, finding _is_shutdown true, raise QueueShutDown themselves. This keeps the cancellation accounting in one place.
PriorityQueue and LifoQueue overridable primitives
Queue delegates item storage to three private methods (_init, _get, _put) that subclasses override. PriorityQueue (line 281) replaces the deque with a list and uses heapq.heappush / heapq.heappop. LifoQueue (line 297) replaces it with a list using append / pop.
# CPython: Lib/asyncio/queues.py:281 PriorityQueue
class PriorityQueue(Queue):
def _init(self, maxsize):
self._queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self._queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self._queue)
# CPython: Lib/asyncio/queues.py:297 LifoQueue
class LifoQueue(Queue):
def _init(self, maxsize):
self._queue = []
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.pop()
The heappush and heappop default-argument trick binds the module-level functions at class-definition time, avoiding a global lookup on every call.
gopy notes
Go channels cover the basic producer-consumer case but do not directly model join / task_done. A port would need a counter variable protected by a mutex plus a sync.Cond to implement join. The _getters and _putters deques map to slices of channels or condition variables. PriorityQueue maps to container/heap. shutdown has no Go-channel equivalent; a context.Context cancellation is the closest analogue and would need to be layered on top.
CPython 3.14 changes
QueueShutDownexception andQueue.shutdown(immediate=False)are new in 3.13 and retained in 3.14, mirroring thequeue.Queue.shutdownadded to the threading queue in 3.13.Queuenow inherits frommixins._LoopBoundMixin(replacing the previously storedself._loopattribute), accessing the running loop lazily via_get_loop().Queue.__class_getitem__is set toclassmethod(GenericAlias), enablingQueue[int]type annotations.