Lib/threading.py (part 4)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/threading.py
This annotation covers synchronization primitives. See lib_threading3_detail for Thread, Lock, and RLock.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | Event | Boolean flag with wait |
| 81-180 | Condition | Monitor-style synchronization |
| 181-280 | Semaphore | Counting semaphore |
| 281-380 | BoundedSemaphore | Semaphore with upper bound |
| 381-500 | Barrier | N-thread synchronization point |
Reading
Event
# CPython: Lib/threading.py:580 Event
class Event:
def __init__(self):
self._cond = Condition(Lock())
self._flag = False
def set(self):
with self._cond:
self._flag = True
self._cond.notify_all()
def wait(self, timeout=None):
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled
Event wraps a Condition and a boolean flag. set() sets the flag and wakes all waiting threads. wait() returns immediately if the flag is already set; otherwise it waits for set() with an optional timeout. clear() resets the flag without waking waiters.
Condition
# CPython: Lib/threading.py:240 Condition.wait
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try:
if timeout is None:
waiter.acquire()
gotit = True
else:
gotit = waiter.acquire(True, timeout)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try: self._waiters.remove(waiter)
except ValueError: pass
Condition.wait releases the underlying lock, waits on a per-waiter lock, then reacquires the underlying lock. Each waiter gets a dedicated lock acquired in advance; notify releases the waiter's lock to wake the thread.
Semaphore
# CPython: Lib/threading.py:490 Semaphore
class Semaphore:
def __init__(self, value=1):
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
with self._cond:
while self._value == 0:
if not blocking:
return False
if not self._cond.wait(timeout):
return False
self._value -= 1
return True
def release(self, n=1):
with self._cond:
self._value += n
for _ in range(n):
self._cond.notify()
Semaphore(3) allows 3 concurrent acquire() calls before blocking. release(n=1) increments the counter by n and wakes n waiting threads (added in 3.9). The wait loop handles spurious wakeups.
Barrier
# CPython: Lib/threading.py:640 Barrier.wait
def wait(self, timeout=None):
with self._cond:
self._check_broken()
index = self._count
self._count += 1
try:
if index + 1 == self.parties:
# Last thread: release all
self._release()
else:
# Wait for the last thread
self._wait(timeout)
return index
finally:
self._count -= 1
if self._count == 0:
self._reset()
Barrier(n) synchronizes exactly n threads. Each thread calls wait() and blocks until all n threads have arrived. The last thread to arrive calls _release() which notifies all waiting threads. The barrier then resets for reuse.
gopy notes
threading is a pure-Python module; gopy runs it directly. Condition.wait uses _allocate_lock() which calls _thread.allocate_lock(), routing to module/thread.AllocateLock which wraps a Go sync.Mutex. Semaphore.acquire routes through the Condition which uses Go mutex primitives. Barrier is pure Python over Condition.