Skip to main content

Lib/threading.py (part 4)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/threading.py

This annotation covers synchronization primitives. See lib_threading3_detail for Thread, Lock, and RLock.

Map

LinesSymbolRole
1-80EventBoolean flag with wait
81-180ConditionMonitor-style synchronization
181-280SemaphoreCounting semaphore
281-380BoundedSemaphoreSemaphore with upper bound
381-500BarrierN-thread synchronization point

Reading

Event

# CPython: Lib/threading.py:580 Event
class Event:
def __init__(self):
self._cond = Condition(Lock())
self._flag = False

def set(self):
with self._cond:
self._flag = True
self._cond.notify_all()

def wait(self, timeout=None):
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled

Event wraps a Condition and a boolean flag. set() sets the flag and wakes all waiting threads. wait() returns immediately if the flag is already set; otherwise it waits for set() with an optional timeout. clear() resets the flag without waking waiters.

Condition

# CPython: Lib/threading.py:240 Condition.wait
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
gotit = False
try:
if timeout is None:
waiter.acquire()
gotit = True
else:
gotit = waiter.acquire(True, timeout)
return gotit
finally:
self._acquire_restore(saved_state)
if not gotit:
try: self._waiters.remove(waiter)
except ValueError: pass

Condition.wait releases the underlying lock, waits on a per-waiter lock, then reacquires the underlying lock. Each waiter gets a dedicated lock acquired in advance; notify releases the waiter's lock to wake the thread.

Semaphore

# CPython: Lib/threading.py:490 Semaphore
class Semaphore:
def __init__(self, value=1):
self._cond = Condition(Lock())
self._value = value

def acquire(self, blocking=True, timeout=None):
with self._cond:
while self._value == 0:
if not blocking:
return False
if not self._cond.wait(timeout):
return False
self._value -= 1
return True

def release(self, n=1):
with self._cond:
self._value += n
for _ in range(n):
self._cond.notify()

Semaphore(3) allows 3 concurrent acquire() calls before blocking. release(n=1) increments the counter by n and wakes n waiting threads (added in 3.9). The wait loop handles spurious wakeups.

Barrier

# CPython: Lib/threading.py:640 Barrier.wait
def wait(self, timeout=None):
with self._cond:
self._check_broken()
index = self._count
self._count += 1
try:
if index + 1 == self.parties:
# Last thread: release all
self._release()
else:
# Wait for the last thread
self._wait(timeout)
return index
finally:
self._count -= 1
if self._count == 0:
self._reset()

Barrier(n) synchronizes exactly n threads. Each thread calls wait() and blocks until all n threads have arrived. The last thread to arrive calls _release() which notifies all waiting threads. The barrier then resets for reuse.

gopy notes

threading is a pure-Python module; gopy runs it directly. Condition.wait uses _allocate_lock() which calls _thread.allocate_lock(), routing to module/thread.AllocateLock which wraps a Go sync.Mutex. Semaphore.acquire routes through the Condition which uses Go mutex primitives. Barrier is pure Python over Condition.