Lib/asyncio/base_events.py
Source:
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/base_events.py
base_events.py contains BaseEventLoop, the abstract base class for all asyncio event loop implementations. The two concrete subclasses shipped in CPython are SelectorEventLoop (Unix default, Windows fallback) and ProactorEventLoop (Windows default). Every scheduling primitive (call_soon, call_later, call_at), the top-level run_until_complete entry point, and the I/O polling heart (_run_once) live here.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | module prologue, _MIN_SCHEDULED_TIMER_HANDLES | constants and imports |
| 81-200 | BaseEventLoop.__init__, _check_closed, _check_running | lifecycle guards |
| 201-350 | run_forever, run_until_complete, stop, close | public run controls |
| 351-480 | call_soon, call_soon_threadsafe | immediate callback queue |
| 481-600 | call_later, call_at, _scheduled heap | timed callback scheduling |
| 601-780 | _run_once | selector poll, timer dispatch |
| 781-950 | create_connection, _create_connection_transport | TCP/SSL client transport |
| 951-1100 | create_server, _start_serving | TCP server transport |
| 1101-1300 | create_datagram_endpoint | UDP transport |
| 1301-1500 | create_unix_connection, create_unix_server | Unix socket transport |
| 1501-1700 | subprocess_exec, subprocess_shell | process transport |
| 1701-1900 | getaddrinfo, getnameinfo, DNS helpers | resolver wrappers |
Reading
_run_once: the single iteration of the event loop
_run_once is called on every loop tick. It does three things in order: drain ready callbacks, compute the selector timeout from the scheduled heap, call self._selector.select(timeout), then convert I/O events back into ready callbacks.
The heap pruning at the top is a performance guard. Cancelled TimerHandle objects stay on the heap until the cancelled fraction exceeds _MIN_CANCELLED_TIMER_HANDLES_FRACTION (0.5), at which point the loop rebuilds the heap cleanly rather than popping one at a time.
The timeout computation collapses three cases: if ready callbacks exist the loop should not block (timeout = 0), if the heap is empty the loop may block indefinitely (timeout = None), and otherwise the loop blocks for at most the time until the next scheduled handle fires.
# CPython: Lib/asyncio/base_events.py:1872 BaseEventLoop._run_once
def _run_once(self):
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES
and self._timer_cancelled_count / sched_count
> _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
event_list = self._selector.select(timeout)
self._process_events(event_list)
call_soon, call_later, call_at: scheduling primitives
All three scheduling methods ultimately append a Handle or TimerHandle onto either self._ready (a collections.deque) or self._scheduled (a heapq). call_later delegates entirely to call_at by adding the current self.time() offset. call_at creates a TimerHandle and pushes it onto the min-heap keyed by _when. _run_once pops handles from the heap into _ready once their _when has passed.
# CPython: Lib/asyncio/base_events.py:756 BaseEventLoop.call_soon
def call_soon(self, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle._source_traceback:
del handle._source_traceback[-1]
return handle
# CPython: Lib/asyncio/base_events.py:794 BaseEventLoop.call_later
def call_later(self, delay, callback, *args, context=None):
timer = self.call_at(self.time() + delay, callback, *args,
context=context)
if timer._source_traceback:
del timer._source_traceback[-1]
return timer
# CPython: Lib/asyncio/base_events.py:806 BaseEventLoop.call_at
def call_at(self, when, callback, *args, context=None):
self._check_closed()
if self._debug:
self._check_callback(callback, 'call_at')
timer = events.TimerHandle(when, callback, args, self, context)
if timer._source_traceback:
del timer._source_traceback[-1]
heapq.heappush(self._scheduled, timer)
timer._scheduled = True
return timer
run_until_complete: coroutine wrapping and loop lifecycle
run_until_complete is the main public entry point for running a single coroutine to completion. If the argument is a plain coroutine (not already a Future), ensure_future wraps it in a Task. A done callback (_run_until_complete_cb) calls loop.stop() when the task finishes. run_forever then drives the loop until stop() sets self._stopping = True, which causes _run_once to use timeout = 0 and the outer loop to exit after draining ready callbacks.
# CPython: Lib/asyncio/base_events.py:625 BaseEventLoop.run_until_complete
def run_until_complete(self, future):
self._check_closed()
self._check_running()
new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError(
'Event loop stopped before Future completed.')
return future.result()
create_connection and transport factories
create_connection resolves the host/port pair, iterates the address list, and calls _create_connection_transport for the first address that connects successfully. The fallback loop over infos implements Happy Eyeballs-lite: it tries each resolved address in order and uses the first that succeeds. SSL wrapping happens inside _create_connection_transport by replacing the raw socket transport with an SSLProto layer before returning.
# CPython: Lib/asyncio/base_events.py:1005 BaseEventLoop.create_connection
async def create_connection(
self, protocol_factory, host=None, port=None, *,
ssl=None, family=0, proto=0, flags=0, ...):
infos = await self._ensure_resolved(
(host, port), family=family, type=socket.SOCK_STREAM,
proto=proto, flags=flags, loop=self)
...
for addrinfo in infos:
af, socktype, proto, canonname, address = addrinfo
try:
sock = socket.socket(af, socktype, proto)
...
await self.sock_connect(sock, address)
except OSError as exc:
...
continue
break
transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
return transport, protocol
gopy notes
Status: not yet ported.
Planned package path: module/asyncio/.
BaseEventLoop depends on the selector abstraction (selectors.py), the Future and Task types, and the Handle/TimerHandle wrappers. All four subsystems need to be present before BaseEventLoop._run_once can run. The selector layer maps to Go's syscall.Select or golang.org/x/sys/unix.EpollWait depending on platform. The _ready deque maps to a Go []func() slice drained each tick. The scheduled heap can be ported using a container/heap backed slice of TimerHandle values. call_soon_threadsafe writes to a self-pipe to wake the selector; the Go port should use a chan struct for the same purpose.