Skip to main content

Lib/asyncio/base_events.py

Source:

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/base_events.py

base_events.py contains BaseEventLoop, the abstract base class for all asyncio event loop implementations. The two concrete subclasses shipped in CPython are SelectorEventLoop (Unix default, Windows fallback) and ProactorEventLoop (Windows default). Every scheduling primitive (call_soon, call_later, call_at), the top-level run_until_complete entry point, and the I/O polling heart (_run_once) live here.

Map

LinesSymbolRole
1-80module prologue, _MIN_SCHEDULED_TIMER_HANDLESconstants and imports
81-200BaseEventLoop.__init__, _check_closed, _check_runninglifecycle guards
201-350run_forever, run_until_complete, stop, closepublic run controls
351-480call_soon, call_soon_threadsafeimmediate callback queue
481-600call_later, call_at, _scheduled heaptimed callback scheduling
601-780_run_onceselector poll, timer dispatch
781-950create_connection, _create_connection_transportTCP/SSL client transport
951-1100create_server, _start_servingTCP server transport
1101-1300create_datagram_endpointUDP transport
1301-1500create_unix_connection, create_unix_serverUnix socket transport
1501-1700subprocess_exec, subprocess_shellprocess transport
1701-1900getaddrinfo, getnameinfo, DNS helpersresolver wrappers

Reading

_run_once: the single iteration of the event loop

_run_once is called on every loop tick. It does three things in order: drain ready callbacks, compute the selector timeout from the scheduled heap, call self._selector.select(timeout), then convert I/O events back into ready callbacks.

The heap pruning at the top is a performance guard. Cancelled TimerHandle objects stay on the heap until the cancelled fraction exceeds _MIN_CANCELLED_TIMER_HANDLES_FRACTION (0.5), at which point the loop rebuilds the heap cleanly rather than popping one at a time.

The timeout computation collapses three cases: if ready callbacks exist the loop should not block (timeout = 0), if the heap is empty the loop may block indefinitely (timeout = None), and otherwise the loop blocks for at most the time until the next scheduled handle fires.

# CPython: Lib/asyncio/base_events.py:1872 BaseEventLoop._run_once
def _run_once(self):
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES
and self._timer_cancelled_count / sched_count
> _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False

timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

event_list = self._selector.select(timeout)
self._process_events(event_list)

call_soon, call_later, call_at: scheduling primitives

All three scheduling methods ultimately append a Handle or TimerHandle onto either self._ready (a collections.deque) or self._scheduled (a heapq). call_later delegates entirely to call_at by adding the current self.time() offset. call_at creates a TimerHandle and pushes it onto the min-heap keyed by _when. _run_once pops handles from the heap into _ready once their _when has passed.

# CPython: Lib/asyncio/base_events.py:756 BaseEventLoop.call_soon
def call_soon(self, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle

# CPython: Lib/asyncio/base_events.py:794 BaseEventLoop.call_later
def call_later(self, delay, callback, *args, context=None):
timer = self.call_at(self.time() + delay, callback, *args,
context=context)
if timer._source_traceback:
del timer._source_traceback[-1]
return timer

# CPython: Lib/asyncio/base_events.py:806 BaseEventLoop.call_at
def call_at(self, when, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_at')
timer = events.TimerHandle(when, callback, args, self, context)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
return timer

run_until_complete: coroutine wrapping and loop lifecycle

run_until_complete is the main public entry point for running a single coroutine to completion. If the argument is a plain coroutine (not already a Future), ensure_future wraps it in a Task. A done callback (_run_until_complete_cb) calls loop.stop() when the task finishes. run_forever then drives the loop until stop() sets self._stopping = True, which causes _run_once to use timeout = 0 and the outer loop to exit after draining ready callbacks.

# CPython: Lib/asyncio/base_events.py:625 BaseEventLoop.run_until_complete
def run_until_complete(self, future):
self._check_closed()
self._check_running()

new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError(
'Event loop stopped before Future completed.')
return future.result()

create_connection and transport factories

create_connection resolves the host/port pair, iterates the address list, and calls _create_connection_transport for the first address that connects successfully. The fallback loop over infos implements Happy Eyeballs-lite: it tries each resolved address in order and uses the first that succeeds. SSL wrapping happens inside _create_connection_transport by replacing the raw socket transport with an SSLProto layer before returning.

# CPython: Lib/asyncio/base_events.py:1005 BaseEventLoop.create_connection
async def create_connection(
self, protocol_factory, host=None, port=None, *,
ssl=None, family=0, proto=0, flags=0, ...):
infos = await self._ensure_resolved(
(host, port), family=family, type=socket.SOCK_STREAM,
proto=proto, flags=flags, loop=self)
...
for addrinfo in infos:
af, socktype, proto, canonname, address = addrinfo
try:
sock = socket.socket(af, socktype, proto)
...
await self.sock_connect(sock, address)
except OSError as exc:
...
continue
break
transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
return transport, protocol

gopy notes

Status: not yet ported.

Planned package path: module/asyncio/.

BaseEventLoop depends on the selector abstraction (selectors.py), the Future and Task types, and the Handle/TimerHandle wrappers. All four subsystems need to be present before BaseEventLoop._run_once can run. The selector layer maps to Go's syscall.Select or golang.org/x/sys/unix.EpollWait depending on platform. The _ready deque maps to a Go []func() slice drained each tick. The scheduled heap can be ported using a container/heap backed slice of TimerHandle values. call_soon_threadsafe writes to a self-pipe to wake the selector; the Go port should use a chan struct for the same purpose.