queue.py: deque+Condition design, blocking put/get, task_done/join, and SimpleQueue
Map
| Lines | Symbol | Purpose |
|---|---|---|
| 1-20 | module preamble | Imports deque, heapq, threading; tries to import _queue C extension for SimpleQueue |
| 21-40 | Empty, Full | Exception classes raised by non-blocking or timed-out get/put calls |
| 41-80 | Queue.__init__ | Allocates deque, creates mutex (Lock), not_empty and not_full (Condition) pairs, all_tasks_done (Condition), and unfinished_tasks counter |
| 81-130 | Queue.put, Queue.put_nowait | Acquires not_full; blocks if maxsize > 0 and queue is full; calls _put; notifies not_empty |
| 131-175 | Queue.get, Queue.get_nowait | Acquires not_empty; blocks if queue is empty; calls _get; notifies not_full |
| 176-210 | Queue.task_done, Queue.join | task_done decrements unfinished_tasks; join blocks on all_tasks_done.wait() until count reaches zero |
| 211-230 | Queue.qsize, Queue.empty, Queue.full | Non-locking size hints; documented as unreliable in multi-threaded contexts |
| 231-260 | Queue._put, Queue._get, Queue._qsize | Internal hooks overridden by subclasses to swap the storage data structure |
| 261-290 | LifoQueue | Overrides _put/_get to use list.append/list.pop (LIFO stack) |
| 291-320 | PriorityQueue | Overrides _put/_get to use heapq.heappush/heapq.heappop |
| 321-350 | SimpleQueue | Wraps _queue.SimpleQueue C extension; no task_done/join support |
Reading
deque+Condition design
Queue pairs a collections.deque as the item store with three Condition objects sharing the same underlying mutex. Using a single lock for all three conditions keeps the critical sections simple: every put and get path runs while holding mutex, so there is no risk of the queue size changing between the while-loop predicate check and the actual _put/_get call. not_empty.notify() is called at the end of put, and not_full.notify() is called at the end of get, so blocked callers are woken exactly when the precondition they need has changed.
put/get blocking with timeout
Both put and get accept block=True and timeout=None. When block is True and timeout is None, the call waits indefinitely. When timeout is a non-negative number, the method computes an absolute deadline (time.monotonic() + timeout) before entering the while loop and passes the remaining time to Condition.wait. If the deadline passes and the condition is still unmet, Full or Empty is raised. When block is False, a queue.Full or queue.Empty is raised immediately if the precondition is not met.
task_done and join
unfinished_tasks starts at zero and is incremented by one inside _put for every item added. task_done() decrements it under all_tasks_done and calls all_tasks_done.notify_all() when the count reaches zero. join() blocks on all_tasks_done.wait() inside a while unfinished_tasks loop. This design allows producers to call put and consumers to call task_done from different threads without any additional coordination.
SimpleQueue and the _queue C extension
SimpleQueue is a thin Python wrapper around _queue.SimpleQueue, a lock-free C implementation using the GIL as its synchronization primitive. It exposes only put, get, get_nowait, put_nowait, empty, and qsize. There is no task_done/join support and no maxsize limit. In CPython 3.14, if the _queue C extension is unavailable (e.g. on an embedded build), the module falls back to a pure-Python SimpleQueue built on Queue with maxsize=0.
gopy notes
Queueuses threeConditionobjects sharing onemutex. The gopy port can represent this with a singlesync.Mutexand twosync.Condobjects (notEmpty,notFull) plus a third forallTasksDone.- The timeout path computes a deadline before the loop. gopy must replicate the monotonic-clock arithmetic;
time.Now().Add(timeout)is the direct analogue. LifoQueueandPriorityQueueonly override_put,_get, and_qsize. The gopy port can model these as struct embedding with overridden method pointers or as separate structs that delegate locking to the baseQueue.SimpleQueuebypasses theConditionmachinery. The gopy equivalent is a buffered channel or async/atomicring buffer, depending on whether unbounded capacity is required.