Lib/threading.py (part 3)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/threading.py
This annotation covers the higher-level synchronization primitives. See modules_threading2_detail for Thread, Lock, RLock, and Event.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | Condition.__init__ | Wraps a Lock; provides wait and notify |
| 81-180 | Condition.wait / Condition.wait_for | Release lock, sleep, re-acquire on notify |
| 181-280 | Semaphore | Counting semaphore using Condition |
| 281-380 | BoundedSemaphore | Semaphore with upper-bound check |
| 381-600 | Barrier | Synchronize N threads at a common point |
Reading
Condition.wait
# CPython: Lib/threading.py:320 Condition.wait
def wait(self, timeout=None):
"""Wait until notified or until a timeout occurs."""
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try:
if timeout is None:
waiter.acquire()
gotit = True
else:
gotit = waiter.acquire(True, timeout)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try:
self._waiters.remove(waiter)
except ValueError:
pass
Condition.wait creates a private lock (waiter) and appends it to _waiters, releases the condition lock, then blocks on waiter.acquire(). notify() calls waiter.release() on the first waiter, waking it up. The condition lock is re-acquired before returning.
Condition.wait_for
# CPython: Lib/threading.py:360 Condition.wait_for
def wait_for(self, predicate, timeout=None):
"""Wait until a condition evaluates to True."""
endtime = None
waittime = timeout
result = predicate()
while not result:
if waittime is not None:
if endtime is None:
endtime = _time() + waittime
else:
waittime = endtime - _time()
if waittime <= 0:
break
self.wait(waittime)
result = predicate()
return result
wait_for(pred, timeout) repeatedly calls wait until pred() returns truthy or the deadline passes. The shrinking waittime ensures the total sleep never exceeds timeout.
Semaphore
# CPython: Lib/threading.py:420 Semaphore.acquire
class Semaphore:
def __init__(self, value=1):
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError(...)
rc = False
with self._cond:
while self._value == 0:
if not blocking:
break
self._cond.wait(timeout)
...
else:
self._value -= 1
rc = True
return rc
def release(self, n=1):
with self._cond:
self._value += n
for _ in range(n):
self._cond.notify()
Semaphore.acquire decrements the counter; if zero, it waits on the condition. release(n=2) (Python 3.9+) allows releasing multiple units at once, useful for producer-consumer patterns with batched production.
Barrier
# CPython: Lib/threading.py:580 Barrier
class Barrier:
def __init__(self, parties, action=None, timeout=None):
self._cond = Condition(Lock())
self._action = action
self._timeout = timeout
self._parties = parties
self._state = 0 # 0: filling, 1: draining, -1: broken
self._count = 0
def wait(self, timeout=None):
with self._cond:
self._check_broken()
self._count += 1
index = self._count
if self._count == self._parties:
# Last thread in: release everyone
self._release()
else:
self._wait(timeout)
return index - 1
def _release(self):
try:
if self._action:
self._action()
self._state = 1 # draining
self._cond.notify_all()
except:
self._break()
raise
Barrier(n) requires n threads to call wait() before any of them proceeds. The last thread calls the optional action callback before releasing the others. Barrier.abort() puts the barrier into a broken state; subsequent wait calls raise BrokenBarrierError.
gopy notes
Condition is module/threading.Condition in module/threading/module.go backed by Go's sync.Cond. Semaphore uses a buffered channel of size value. BoundedSemaphore adds an upper bound check. Barrier uses sync.WaitGroup plus a sync.Cond for the drain phase.