Lib/threading.py (part 4)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/threading.py
This annotation covers synchronization primitives. See lib_threading3_detail for Thread, Lock, RLock, and Timer.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-100 | Condition | Monitor with wait/notify |
| 101-200 | Semaphore / BoundedSemaphore | Counting semaphore |
| 201-280 | Event | One-time signal between threads |
| 281-360 | Barrier | Synchronize N threads at a checkpoint |
| 361-600 | local() | Thread-local storage |
Reading
Condition
# CPython: Lib/threading.py:220 Condition.wait
class Condition:
def wait(self, timeout=None):
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
waiter = _allocate_lock()
waiter.acquire()
self._waiters.append(waiter)
saved_state = self._release_save()
try:
if timeout is None:
waiter.acquire()
return True
else:
return waiter.acquire(True, timeout)
finally:
self._acquire_restore(saved_state)
self._waiters.remove(waiter)
Condition.wait releases the underlying lock and blocks on a per-waiter lock. notify() releases one waiter lock; notify_all() releases all. This is the standard "Mesa-style" monitor: after wait returns, the condition must be re-checked.
Semaphore
# CPython: Lib/threading.py:480 Semaphore.acquire
class Semaphore:
def __init__(self, value=1):
self._cond = Condition(Lock())
self._value = value
def acquire(self, blocking=True, timeout=None):
with self._cond:
if not blocking and self._value == 0:
return False
while self._value == 0:
if not self._cond.wait(timeout=timeout):
return False
self._value -= 1
return True
def release(self, n=1):
with self._cond:
self._value += n
for _ in range(min(n, len(self._cond._waiters))):
self._cond.notify()
Semaphore is built on Condition. release(n) can release multiple permits at once (Python 3.9+). BoundedSemaphore raises ValueError if release would exceed the initial value.
Barrier
# CPython: Lib/threading.py:620 Barrier.wait
class Barrier:
def __init__(self, parties, action=None, timeout=None):
self._cond = Condition(Lock())
self._parties = parties
self._action = action
self._count = 0
self._phase = 0
def wait(self, timeout=None):
with self._cond:
phase = self._phase
self._count += 1
if self._count == self._parties:
self._phase += 1
self._count = 0
if self._action:
self._action()
self._cond.notify_all()
else:
while self._phase == phase:
self._cond.wait(timeout)
Barrier(n) makes n threads wait until all have called wait(). The last arrival triggers the optional action callback and wakes all waiters. Phase alternation allows barriers to be reused: _phase distinguishes the current rendezvous from the next.
local()
// CPython: Modules/_threadmodule.c:1120 _local_impl
/* Thread-local storage: each thread gets its own __dict__ */
typedef struct {
PyObject_HEAD
PyObject *key; /* unique key for this local */
PyObject *args;
PyObject *kw;
PyObject *weakreflist;
} localobject;
static PyObject *
local_getattro(localobject *self, PyObject *name)
{
PyObject *ldict = _ldict(self); /* get this thread's dict */
return PyObject_GenericGetAttr((PyObject *)ldict, name);
}
threading.local() creates an object whose attributes are per-thread. Internally each thread gets its own __dict__ keyed by the local object's identity. On thread exit, all local dictionaries for that thread are deleted.
gopy notes
Condition is module/threading.Condition in module/threading/module.go, backed by sync.Cond. Semaphore uses sync.Mutex + counter. Event uses sync.Cond with a bool flag. Barrier uses sync.WaitGroup. local() uses goroutine.Local backed by a sync.Map keyed by goroutine ID.