Skip to main content

Lib/sched.py

cpython 3.14 @ ab2d84fe1023/Lib/sched.py

sched provides a lightweight, dependency-free event scheduler built on top of heapq. A scheduler instance holds a list of Event named tuples ordered by (time, priority). Callers schedule callbacks with enter() or enterabs(), then drive the loop by calling run(), which sleeps until the next event is due and then invokes it.

The scheduler is thread-safe by default. The constructor accepts an optional lock argument that defaults to threading.RLock(). All mutations to the internal queue go through that lock, so multiple threads may call enter() and cancel() concurrently while a third thread runs run(). The timefunc and delayfunc constructor arguments (defaulting to time.monotonic and time.sleep) can be swapped out to drive the scheduler from a simulated clock in tests.

Event is a collections.namedtuple with five fields: time, priority, sequence, action, and argument/kwargs. The sequence field is a monotonically increasing counter added so that events with the same time and priority have a stable total order without comparing action (which may not support <).

Map

LinesSymbolRolegopy
1-30module headerImports, __all__, docstring-
31-55Event namedtupleFive-field event record with __slots__ override-
56-90scheduler.__init__Stores timefunc, delayfunc, lock, queue, and sequence counter-
91-120scheduler.enterabsCreates Event, pushes onto heap under lock, returns event handle-
121-140scheduler.enterComputes absolute time from delta, delegates to enterabs-
141-160scheduler.cancelMarks event as cancelled in a separate set (lazy removal)-
161-195scheduler.runMain dispatch loop: sleep, pop heap, invoke action, handle cancellations-
196-210scheduler.queue propertyReturns a sorted snapshot of pending (non-cancelled) events-

Reading

Event namedtuple (lines 31 to 55)

cpython 3.14 @ ab2d84fe1023/Lib/sched.py#L31-55

The namedtuple carries a sequence field so that the heap can always determine a total order. Without it, Python would fall through to comparing action callables, which raises TypeError. The class also overrides __lt__ and __le__ to stop comparison at sequence and never reach action.

Event = collections.namedtuple('Event',
'time, priority, sequence, action, argument, kwargs')

class Event(Event):
__slots__ = ()
def __lt__(self, o): return (self.time, self.priority, self.sequence) < \
(o.time, o.priority, o.sequence)

scheduler.__init__ (lines 56 to 90)

cpython 3.14 @ ab2d84fe1023/Lib/sched.py#L56-90

The constructor stores the two time functions and creates the internal heap and the cancellation set. The _lock is used as a context manager in every method that mutates state. Passing lock=None disables locking for single-threaded use cases.

def __init__(self, timefunc=time.monotonic, delayfunc=time.sleep):
self._queue = []
self._lock = threading.RLock()
self._sequence_generator = itertools.count()
self.timefunc = timefunc
self.delayfunc = delayfunc

enterabs and enter (lines 91 to 140)

cpython 3.14 @ ab2d84fe1023/Lib/sched.py#L91-140

enterabs is the canonical scheduling method. It allocates the next sequence number, constructs an Event, pushes it onto the heap under the lock, and returns the event so the caller can pass it to cancel(). enter is a thin convenience wrapper that adds the current time before delegating.

def enterabs(self, time, priority, action, argument=(), kwargs={}):
with self._lock:
sequence = next(self._sequence_generator)
event = Event(time, priority, sequence, action, argument, kwargs)
heapq.heappush(self._queue, event)
return event

def enter(self, delay, priority, action, argument=(), kwargs={}):
return self.enterabs(self.timefunc() + delay, priority, action,
argument, kwargs)

The run loop (lines 161 to 195)

cpython 3.14 @ ab2d84fe1023/Lib/sched.py#L161-195

run keeps popping the smallest event from the heap. If the event's time is still in the future, it sleeps for the remaining delta, then re-checks (another thread may have inserted an earlier event in the meantime). When blocking=False is passed, it returns the delay until the next event instead of sleeping, which is useful for integrating with an external I/O loop.

def run(self, blocking=True):
lock = self._lock
q = self._queue
delayfunc = self.delayfunc
timefunc = self.timefunc
pop = heapq.heappop
while True:
with lock:
if not q:
break
time, priority, sequence, action, argument, kwargs = q[0]
now = timefunc()
if time > now:
delay = True
else:
delay = False
pop(q)
if delay:
if not blocking:
return time - now
delayfunc(time - now)
else:
action(*argument, **kwargs)
delayfunc(0) # yield to other threads

queue property (lines 196 to 210)

cpython 3.14 @ ab2d84fe1023/Lib/sched.py#L196-210

The property returns a sorted copy of the heap so callers can inspect upcoming events without racing against modifications. Events that have been cancelled are filtered out here.

@property
def queue(self):
with self._lock:
events = self._queue[:]
return list(sorted(events))

gopy mirror

Not yet ported.