Skip to main content

Lib/asyncio/__init__.py

Source:

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/__init__.py

asyncio is Python's built-in async I/O framework. It provides an event loop, coroutine scheduling (Task), futures, and high-level networking/sync primitives. The core is implemented in a mix of Python (Lib/asyncio/) and C (Modules/_asynciomodule.c).

Map

FileSymbolRole
events.pyAbstractEventLoop, get_event_loop, runLoop API
base_events.pyBaseEventLoopCore loop: I/O poll, callbacks, task scheduling
tasks.pyTask, gather, wait, shieldCoroutine scheduling
futures.pyFutureLow-level awaitable with result/exception
queues.pyQueue, PriorityQueue, LifoQueueAsync queues
locks.pyLock, Event, Condition, Semaphore, BoundedSemaphoreSync primitives
streams.pyStreamReader, StreamWriter, open_connection, start_serverTCP streams
selector_events.pyDefaultEventLoop on POSIXselectors-based I/O

Reading

asyncio.run

# CPython: Lib/asyncio/runners.py:180 run
def run(main, *, debug=None, loop_factory=None):
if events._get_running_loop() is not None:
raise RuntimeError("asyncio.run() cannot be called from a running event loop")
with Runner(debug=debug, loop_factory=loop_factory) as runner:
return runner.run(main)

asyncio.run is the standard entry point. It creates a new event loop, runs main until it returns, then closes the loop.

Task.__step

# CPython: Lib/asyncio/tasks.py:310 Task.__step
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
self.set_result(exc.value)
except CancelledError as exc:
...
except Exception as exc:
self.set_exception(exc)
else:
# result is a Future being awaited
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking:
result._asyncio_future_blocking = False
result.add_done_callback(self.__wakeup)
self._fut_waiter = result

Task.__step drives a coroutine forward with coro.send(None). When the coroutine awaits a Future, it yields the future; __step registers a callback to reschedule the task when the future completes.

Future.set_result

# CPython: Lib/asyncio/futures.py:175 Future.set_result
def set_result(self, result):
if self._state != _PENDING:
raise InvalidStateError(...)
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()

__schedule_callbacks calls loop.call_soon(cb, self) for each registered callback.

gather

# CPython: Lib/asyncio/tasks.py:800 gather
async def gather(*coros_or_futures, return_exceptions=False):
children = []
for arg in coros_or_futures:
if not isinstance(arg, futures.Future):
arg = ensure_future(arg, loop=loop)
children.append(arg)
outer = _GatheringFuture(children, loop=loop)
nchildren = len(children)
...
def _done_callback(fut):
nonlocal nfinished
nfinished += 1
if nfinished == nchildren:
outer.set_result(results)
for child in children:
child.add_done_callback(_done_callback)
return await outer

gopy notes

asyncio is a large subsystem. For the v0.12.1 gate (Lib/test/), the key requirements are: asyncio.run, Task, Future, gather, sleep(0), Queue, Lock, and Event. In gopy the event loop maps to a goroutine scheduler. Task.__step wraps a Python coroutine drive loop. Future callbacks are dispatched via loop.call_soon.