Skip to main content

Lib/asyncio/queues.py

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py

queues.py provides three queue classes for coordinating work between asyncio tasks. Unlike queue.Queue (which blocks OS threads), these classes suspend coroutines via asyncio.Future objects and never block the event loop thread.

Queue is the base class. It maintains an internal collections.deque (_queue) as the item store, a deque of Future objects waiting to put (_putters), and a deque of Future objects waiting to get (_getters). A maxsize of zero means unbounded.

LifoQueue overrides _get to call _queue.pop() instead of _queue.popleft(). PriorityQueue stores items in a list and uses heapq for ordering.

QueueEmpty and QueueFull are the non-blocking exception types, raised by get_nowait and put_nowait respectively when the operation cannot complete immediately.

In CPython 3.14, the loop parameter was removed from Queue.__init__ (deprecated since 3.8). The shutdown method (added in 3.13) is available in 3.14 as a stable API: it drains waiters and prevents further puts or gets.

Map

LinesSymbolRolegopy
1-20QueueEmpty, QueueFullException types; subclass Exception with no added state.
21-80Queue.__init__, Queue._init, Queue._get, Queue._putCore queue state; _init allocates the deque, _put/_get are the overrideable hooks used by subclasses.
81-160Queue.put, Queue.put_nowait, Queue.get, Queue.get_nowaitCoroutine and non-blocking access; put and get loop on a fresh Future until a slot or item is available.
161-210Queue.task_done, Queue.join, Queue.qsize, Queue.empty, Queue.fullWork-tracking and introspection; join suspends until _unfinished_tasks reaches zero.
211-250LifoQueue, PriorityQueue, Queue.shutdownSubclass overrides and the 3.13+ shutdown mechanism.

Reading

Queue.put and Queue.get (lines 81 to 160)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py#L81-160

async def put(self, item):
while self.full():
putter = self._loop.create_future()
self._putters.append(putter)
try:
await putter
except:
putter.cancel() # Just in case putter is not done yet.
try:
# Clean self._putters from cancelled putters.
self._putters.remove(putter)
except ValueError:
pass
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
return self.put_nowait(item)

async def get(self):
while self.empty():
getter = self._loop.create_future()
self._getters.append(getter)
try:
await getter
except:
getter.cancel()
try:
self._getters.remove(getter)
except ValueError:
pass
if not self.empty() and not getter.cancelled():
self._wakeup_next(self._getters)
raise
return self.get_nowait()

put loops rather than branching once because a woken putter may find the queue full again if another putter ran first. The loop creates a new Future on each iteration, appends it to _putters, and awaits it. When get_nowait (or get) removes an item it calls _wakeup_next on _putters, which resolves the oldest putter's Future. If the putter is cancelled while waiting, it removes itself from _putters and checks whether it should pass the wake-up signal along to the next putter.

get mirrors this logic with _getters. put_nowait and get_nowait are the fast paths: they raise immediately when the precondition fails, and on success they call _wakeup_next on the opposite waiter list.

Queue.task_done and Queue.join (lines 161 to 210)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py#L161-210

def task_done(self):
if self._unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self._unfinished_tasks -= 1
if self._unfinished_tasks == 0:
self._finished.set()

async def join(self):
"""Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls task_done()
to indicate that the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.
"""
if self._unfinished_tasks > 0:
await self._finished.wait()

_unfinished_tasks is incremented by put_nowait each time an item is placed in the queue. task_done decrements it. When the count reaches zero, _finished (an asyncio.Event) is set, and any coroutines awaiting join are unblocked. _finished is cleared again by put_nowait, so join can be called in a loop for streaming workloads.

Calling task_done more times than items were put raises ValueError immediately to catch book-keeping bugs before they corrupt the counter.

LifoQueue and PriorityQueue overrides (lines 211 to 250)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/queues.py#L211-250

class LifoQueue(Queue):
"""A subclass of Queue that retrieves most recently added entries first."""

def _init(self, maxsize):
self._queue = []

def _put(self, item):
self._queue.append(item)

def _get(self):
return self._queue.pop()


class PriorityQueue(Queue):
"""A subclass of Queue that retrieves entries in priority order (lowest first)."""

def _init(self, maxsize):
self._queue = []

def _put(self, item):
heapq.heappush(self._queue, item)

def _get(self):
return heapq.heappop(self._queue)

Both subclasses override only the three internal hooks _init, _put, and _get. All coroutine scheduling, maxsize enforcement, and task_done/join logic is inherited from Queue unchanged. LifoQueue uses a plain list with append/pop. PriorityQueue uses heapq so the smallest item (by the normal Python < ordering) is always dequeued first. Items must be comparable; the conventional pattern is to store (priority, item) tuples.

gopy mirror

module/asyncio/ is pending. Queue's _putters and _getters machinery maps directly to Go channels of chan struct{} or slices of *Future. The _finished event maps to gopy's asyncio.Event port. LifoQueue and PriorityQueue are straightforward once Queue lands, requiring only _init/_put/_get overrides in Go.

CPython 3.14 changes

  • The loop parameter removed from Queue.__init__ (hard-removed; deprecated since 3.8).
  • Queue.shutdown(immediate=False) added in 3.13 is stable in 3.14: calling it sets _shutdown = True, wakes all waiters with QueueShutDown, and optionally clears the queue if immediate=True.
  • QueueShutDown exception type added in 3.13 alongside shutdown.