Lib/threading.py (part 2)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/threading.py
This annotation covers the higher-level synchronization classes. See lib_threading_detail for Thread, Lock, RLock, and local.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-120 | Condition | Monitor — lock + wait/notify queue |
| 121-260 | Semaphore | Counting semaphore |
| 261-320 | BoundedSemaphore | Semaphore that raises on over-release |
| 321-420 | Event | One-shot flag — set, clear, wait, is_set |
| 421-600 | Barrier | Rendezvous point for N threads |
| 601-900 | Timer | Thread that runs a function after a delay |
Reading
Condition
# CPython: Lib/threading.py:220 Condition
class Condition:
"""A condition variable.
After acquiring the underlying lock, threads call wait() to release
the lock and sleep until notified, then re-acquire before returning.
"""
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
self._waiters = collections.deque()
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
try:
if timeout is None:
waiter.acquire()
return True
else:
return waiter.acquire(True, timeout)
finally:
self._acquire_restore(saved_state)
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
waiters_to_notify = _deque_islice(self._waiters, n)
for waiter in waiters_to_notify:
waiter.release()
try:
self._waiters.remove(waiter)
except ValueError:
pass
wait() releases the underlying lock, acquires a per-waiter lock that notify() will release. This is a pure-Python implementation of POSIX condition variables.
Semaphore
# CPython: Lib/threading.py:450 Semaphore
class Semaphore:
def __init__(self, value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
if not blocking and timeout is not None:
raise ValueError(...)
with self._cond:
while self._value == 0:
if not blocking:
return False
if not self._cond.wait(timeout=timeout):
return False
self._value -= 1
return True
def release(self, n=1):
if n < 1:
raise ValueError("n must be one or more")
with self._cond:
self._value += n
for _ in range(n):
self._cond.notify()
Semaphore uses a Condition internally. acquire blocks when _value == 0 by calling _cond.wait. release(n) increments the counter and notifies up to n waiting threads.
Event
# CPython: Lib/threading.py:540 Event
class Event:
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
def set(self):
with self._cond:
self._flag = True
self._cond.notify_all()
def wait(self, timeout=None):
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled
Event.wait is efficient: if the flag is already set, it returns immediately without touching the Condition. One-time signal pattern: set once, all waiters unblock.
Barrier
# CPython: Lib/threading.py:600 Barrier
class Barrier:
"""A barrier: N threads each call wait(); all are released together."""
def __init__(self, parties, action=None, timeout=None):
self._cond = Condition(Lock())
self._parties = parties
self._action = action
self._state = 0 # 0=filling, 1=draining, -1=broken
self._count = 0
def wait(self, timeout=None):
with self._cond:
self._enter()
index = self._count
self._count += 1
if self._count == self._parties:
self._release() # last thread — run action and release all
else:
self._wait(timeout)
return index
The last thread to arrive calls _release(), which optionally runs the action callable and then notifies all waiting threads. The return value is each thread's arrival index (0 to N-1).
gopy notes
Condition, Semaphore, Event, and Barrier are pure Python in stdlib/threading.py. The underlying Lock is objects.Lock backed by a Go sync.Mutex. _allocate_lock() calls threading._allocate_lock which returns an objects.Lock. Condition.wait uses the lock's acquire(timeout) method.