Lib/asyncio/__init__.py
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/__init__.py
asyncio is a package split across many files. __init__.py re-exports
everything from the sub-modules under a flat public namespace. The key
files for the runtime are:
Lib/asyncio/base_events.py--BaseEventLoopwith_run_onceand all scheduling primitives.Lib/asyncio/tasks.py--Taskand the coroutine-driving state machine.Lib/asyncio/futures.py--Futureand the result/exception protocol.Lib/asyncio/queues.py--Queue,LifoQueue,PriorityQueue.Lib/asyncio/locks.py--Lock,Event,Condition,Semaphore,BoundedSemaphore.
In 3.14, asyncio.run() gained a loop_factory parameter, and
TaskGroup (added in 3.11) received structured-concurrency improvements.
The _asyncio C extension accelerates Future and Task.
Map
| Lines | Symbol | Role | gopy |
|---|---|---|---|
| futures.py 1-200 | Future, _PENDING, _CANCELLED, _FINISHED, InvalidStateError | Core result/exception container; result() raises CancelledError or the stored exception; add_done_callback schedules callbacks via the loop. | module/asyncio/ |
| tasks.py 1-300 | Task, Task.__step, Task.__step_run_and_handle_result, Task.cancel | Coroutine driver; __step sends a value (or throws) into the coroutine and processes the yielded Future. | module/asyncio/ |
| tasks.py 300-600 | gather, shield, wait, wait_for, sleep, ensure_future | High-level coroutine combinators; gather wraps each awaitable in a Task and collects results; wait_for adds a timeout via loop.call_later. | module/asyncio/ |
| base_events.py 1-400 | BaseEventLoop.__init__, run_forever, run_until_complete, stop, close, _run_once | Event loop core; _run_once selects I/O events, fires timed callbacks, and dispatches ready handles. | module/asyncio/ |
| base_events.py 400-800 | call_soon, call_later, call_at, call_soon_threadsafe, _scheduled, _ready | Callback scheduling; call_later inserts a TimerHandle into the _scheduled heap; call_soon appends a Handle to _ready. | module/asyncio/ |
| queues.py 1-200 | Queue, LifoQueue, PriorityQueue, QueueEmpty, QueueFull | Producer-consumer queue; get and put are coroutines that wait on internal Event-like machinery when the queue is empty or full. | module/asyncio/ |
| locks.py 1-400 | Lock, Event, Condition, Semaphore, BoundedSemaphore | Async synchronization; all use asyncio.Future objects on an internal waiters deque rather than OS primitives. | module/asyncio/ |
Reading
BaseEventLoop._run_once (base_events.py lines 1 to 400)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/base_events.py#L1-400
def _run_once(self):
"""Run one full iteration of the event loop."""
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove cancelled handles
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)
event_list = self._selector.select(timeout)
self._process_events(event_list)
# ... dispatch self._ready handles
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
handle._run()
handle = None # Needed to break cycles when an exception occurs.
_run_once is the innermost loop body. It does four things in sequence.
First, it prunes cancelled TimerHandle objects from the _scheduled
heap when the fraction of cancelled handles exceeds a threshold (avoiding
O(n) scans on every iteration while keeping the heap compact). Second, it
moves any due timer handles from _scheduled into _ready by popping
from the min-heap until the next handle is in the future. Third, it
computes the I/O select timeout: zero if there are ready callbacks,
None (block indefinitely) if there are no timers, or the time until the
next scheduled handle otherwise. Fourth, it processes I/O events from
_selector.select, then drains _ready by calling handle._run() on
each handle. The ntodo = len(self._ready) snapshot is important: any
callbacks added during this drain (by call_soon inside a handler) are
deferred to the next iteration, preventing starvation of I/O.
Task.__step coroutine driver (tasks.py lines 1 to 300)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/tasks.py#L1-300
class Task(futures._PyFuture):
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
self._must_cancel = False
super().cancel(self._cancel_message)
else:
super().set_result(exc.value)
except exceptions.CancelledError as exc:
self._cancelled_message = exc.args[0] if exc.args else None
super().cancel(self._cancel_message)
except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
raise
except BaseException as exc:
super().set_exception(exc)
else:
# The coroutine yielded a Future-like object.
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
result._asyncio_future_blocking = False
result.add_done_callback(self.__step_run_and_handle_result)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(msg=self._cancel_message):
self._must_cancel = False
elif result is None:
self._loop.call_soon(self.__step)
else:
self._loop.call_soon(self.__step,
context=self._context)
finally:
_leave_task(self._loop, self)
self = None
__step is the coroutine driver. It calls coro.send(None) to advance
the coroutine. If the coroutine raises StopIteration, the task is
resolved with the exception's value (the return value of the async def). If it raises CancelledError, the task is cancelled. Any other
exception sets the task's exception state.
When the coroutine yields (the else branch), the yielded object is
inspected for _asyncio_future_blocking. A Future sets this flag to
True before yielding itself with yield self. __step clears the flag,
registers __step_run_and_handle_result as a done callback on the
future, and stores the future in _fut_waiter so that Task.cancel can
propagate a cancellation into it. When the future completes, the callback
fires and calls __step again with the result or exception.
Future result/exception protocol (futures.py lines 1 to 200)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/futures.py#L1-200
class Future:
_state = _PENDING
_result = None
_exception = None
_exception_tb_future = None
def result(self):
if self._state == _CANCELLED:
exc = self._make_cancelled_error()
raise exc
if self._state != _FINISHED:
raise InvalidStateError('Result is not ready.')
self.__log_traceback = False
if self._exception is not None:
raise self._exception.with_traceback(self._exception_tb)
return self._result
def set_result(self, result):
if self._state != _PENDING:
raise InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()
def add_done_callback(self, fn, *, context=None):
if self._state != _PENDING:
self._loop.call_soon(fn, self, context=context)
else:
if context is None:
context = contextvars.copy_context()
self._callbacks.append((fn, context))
Future is a three-state machine: _PENDING, _CANCELLED, _FINISHED.
set_result and set_exception both transition from _PENDING to
_FINISHED and call __schedule_callbacks, which posts all registered
callbacks to the loop with call_soon. add_done_callback either
enqueues the callback if the future is still pending, or schedules it
immediately if the future has already resolved. Each callback carries a
contextvars.Context snapshot so that ContextVar values are isolated
per-task.
gopy mirror
module/asyncio/ is pending. The event loop requires a platform I/O
selector (wrapping epoll/kqueue/select). Task.__step and
Future's state machine are pure logic and can be ported as Go structs.
The coroutine protocol (send/throw/close on a generator object) maps
to gopy's existing generator machinery. _asyncio C acceleration can be
skipped initially; the pure-Python classes in tasks.py and futures.py
are behavioral references.