Lib/asyncio/queues.py
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py
queues.py provides three queue classes for coordinating work between
asyncio tasks. Unlike queue.Queue (which blocks OS threads), these
classes suspend coroutines via asyncio.Future objects and never block
the event loop thread.
Queue is the base class. It maintains an internal collections.deque
(_queue) as the item store, a deque of Future objects waiting to put
(_putters), and a deque of Future objects waiting to get (_getters).
A maxsize of zero means unbounded.
LifoQueue overrides _get to call _queue.pop() instead of
_queue.popleft(). PriorityQueue stores items in a list and uses
heapq for ordering.
QueueEmpty and QueueFull are the non-blocking exception types, raised
by get_nowait and put_nowait respectively when the operation cannot
complete immediately.
In CPython 3.14, the loop parameter was removed from Queue.__init__
(deprecated since 3.8). The shutdown method (added in 3.13) is
available in 3.14 as a stable API: it drains waiters and prevents further
puts or gets.
Map
| Lines | Symbol | Role | gopy |
|---|---|---|---|
| 1-20 | QueueEmpty, QueueFull | Exception types; subclass Exception with no added state. | |
| 21-80 | Queue.__init__, Queue._init, Queue._get, Queue._put | Core queue state; _init allocates the deque, _put/_get are the overrideable hooks used by subclasses. | |
| 81-160 | Queue.put, Queue.put_nowait, Queue.get, Queue.get_nowait | Coroutine and non-blocking access; put and get loop on a fresh Future until a slot or item is available. | |
| 161-210 | Queue.task_done, Queue.join, Queue.qsize, Queue.empty, Queue.full | Work-tracking and introspection; join suspends until _unfinished_tasks reaches zero. | |
| 211-250 | LifoQueue, PriorityQueue, Queue.shutdown | Subclass overrides and the 3.13+ shutdown mechanism. |
Reading
Queue.put and Queue.get (lines 81 to 160)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py#L81-160
async def put(self, item):
while self.full():
putter = self._loop.create_future()
self._putters.append(putter)
try:
await putter
except:
putter.cancel() # Just in case putter is not done yet.
try:
# Clean self._putters from cancelled putters.
self._putters.remove(putter)
except ValueError:
pass
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
return self.put_nowait(item)
async def get(self):
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
await getter
except:
getter.cancel()
try:
self._getters.remove(getter)
except ValueError:
pass
if not self.empty() and not getter.cancelled():
self._wakeup_next(self._getters)
raise
return self.get_nowait()
put loops rather than branching once because a woken putter may find the
queue full again if another putter ran first. The loop creates a new
Future on each iteration, appends it to _putters, and awaits it.
When get_nowait (or get) removes an item it calls _wakeup_next on
_putters, which resolves the oldest putter's Future. If the putter is
cancelled while waiting, it removes itself from _putters and checks
whether it should pass the wake-up signal along to the next putter.
get mirrors this logic with _getters. put_nowait and get_nowait
are the fast paths: they raise immediately when the precondition fails,
and on success they call _wakeup_next on the opposite waiter list.
Queue.task_done and Queue.join (lines 161 to 210)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py#L161-210
def task_done(self):
if self._unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._finished.set()
async def join(self):
"""Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate that the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
"""
if self._unfinished_tasks > 0:
await self._finished.wait()
_unfinished_tasks is incremented by put_nowait each time an item is
placed in the queue. task_done decrements it. When the count reaches
zero, _finished (an asyncio.Event) is set, and any coroutines awaiting
join are unblocked. _finished is cleared again by put_nowait, so
join can be called in a loop for streaming workloads.
Calling task_done more times than items were put raises ValueError
immediately to catch book-keeping bugs before they corrupt the counter.
LifoQueue and PriorityQueue overrides (lines 211 to 250)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py#L211-250
class LifoQueue(Queue):
"""A subclass of Queue that retrieves most recently added entries first."""
def _init(self, maxsize):
self._queue = []
def _put(self, item):
self._queue.append(item)
def _get(self):
return self._queue.pop()
class PriorityQueue(Queue):
"""A subclass of Queue that retrieves entries in priority order (lowest first)."""
def _init(self, maxsize):
self._queue = []
def _put(self, item):
heapq.heappush(self._queue, item)
def _get(self):
return heapq.heappop(self._queue)
Both subclasses override only the three internal hooks _init, _put,
and _get. All coroutine scheduling, maxsize enforcement, and
task_done/join logic is inherited from Queue unchanged. LifoQueue
uses a plain list with append/pop. PriorityQueue uses heapq so
the smallest item (by the normal Python < ordering) is always dequeued
first. Items must be comparable; the conventional pattern is to store
(priority, item) tuples.
gopy mirror
module/asyncio/ is pending. Queue's _putters and _getters
machinery maps directly to Go channels of chan struct{} or slices of
*Future. The _finished event maps to gopy's asyncio.Event port.
LifoQueue and PriorityQueue are straightforward once Queue lands,
requiring only _init/_put/_get overrides in Go.
CPython 3.14 changes
- The
loopparameter removed fromQueue.__init__(hard-removed; deprecated since 3.8). Queue.shutdown(immediate=False)added in 3.13 is stable in 3.14: calling it sets_shutdown = True, wakes all waiters withQueueShutDown, and optionally clears the queue ifimmediate=True.QueueShutDownexception type added in 3.13 alongsideshutdown.