Skip to main content

Lib/asyncio/__init__.py

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/__init__.py

asyncio is a package split across many files. __init__.py re-exports everything from the sub-modules under a flat public namespace. The key files for the runtime are:

  • Lib/asyncio/base_events.py -- BaseEventLoop with _run_once and all scheduling primitives.
  • Lib/asyncio/tasks.py -- Task and the coroutine-driving state machine.
  • Lib/asyncio/futures.py -- Future and the result/exception protocol.
  • Lib/asyncio/queues.py -- Queue, LifoQueue, PriorityQueue.
  • Lib/asyncio/locks.py -- Lock, Event, Condition, Semaphore, BoundedSemaphore.

In 3.14, asyncio.run() gained a loop_factory parameter, and TaskGroup (added in 3.11) received structured-concurrency improvements. The _asyncio C extension accelerates Future and Task.

Map

LinesSymbolRolegopy
futures.py 1-200Future, _PENDING, _CANCELLED, _FINISHED, InvalidStateErrorCore result/exception container; result() raises CancelledError or the stored exception; add_done_callback schedules callbacks via the loop.module/asyncio/
tasks.py 1-300Task, Task.__step, Task.__step_run_and_handle_result, Task.cancelCoroutine driver; __step sends a value (or throws) into the coroutine and processes the yielded Future.module/asyncio/
tasks.py 300-600gather, shield, wait, wait_for, sleep, ensure_futureHigh-level coroutine combinators; gather wraps each awaitable in a Task and collects results; wait_for adds a timeout via loop.call_later.module/asyncio/
base_events.py 1-400BaseEventLoop.__init__, run_forever, run_until_complete, stop, close, _run_onceEvent loop core; _run_once selects I/O events, fires timed callbacks, and dispatches ready handles.module/asyncio/
base_events.py 400-800call_soon, call_later, call_at, call_soon_threadsafe, _scheduled, _readyCallback scheduling; call_later inserts a TimerHandle into the _scheduled heap; call_soon appends a Handle to _ready.module/asyncio/
queues.py 1-200Queue, LifoQueue, PriorityQueue, QueueEmpty, QueueFullProducer-consumer queue; get and put are coroutines that wait on internal Event-like machinery when the queue is empty or full.module/asyncio/
locks.py 1-400Lock, Event, Condition, Semaphore, BoundedSemaphoreAsync synchronization; all use asyncio.Future objects on an internal waiters deque rather than OS primitives.module/asyncio/

Reading

BaseEventLoop._run_once (base_events.py lines 1 to 400)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/base_events.py#L1-400

def _run_once(self):
"""Run one full iteration of the event loop."""
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove cancelled handles
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0

end_time = self.time() + self._clock_resolution
while self._scheduled:
handle = self._scheduled[0]
if handle._when >= end_time:
break
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
self._ready.append(handle)

timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
when = self._scheduled[0]._when
timeout = min(max(0, when - self.time()), MAXIMUM_SELECT_TIMEOUT)

event_list = self._selector.select(timeout)
self._process_events(event_list)
# ... dispatch self._ready handles
ntodo = len(self._ready)
for i in range(ntodo):
handle = self._ready.popleft()
if handle._cancelled:
continue
handle._run()
handle = None # Needed to break cycles when an exception occurs.

_run_once is the innermost loop body. It does four things in sequence. First, it prunes cancelled TimerHandle objects from the _scheduled heap when the fraction of cancelled handles exceeds a threshold (avoiding O(n) scans on every iteration while keeping the heap compact). Second, it moves any due timer handles from _scheduled into _ready by popping from the min-heap until the next handle is in the future. Third, it computes the I/O select timeout: zero if there are ready callbacks, None (block indefinitely) if there are no timers, or the time until the next scheduled handle otherwise. Fourth, it processes I/O events from _selector.select, then drains _ready by calling handle._run() on each handle. The ntodo = len(self._ready) snapshot is important: any callbacks added during this drain (by call_soon inside a handler) are deferred to the next iteration, preventing starvation of I/O.

Task.__step coroutine driver (tasks.py lines 1 to 300)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/tasks.py#L1-300

class Task(futures._PyFuture):
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None

_enter_task(self._loop, self)
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self._must_cancel:
self._must_cancel = False
super().cancel(self._cancel_message)
else:
super().set_result(exc.value)
except exceptions.CancelledError as exc:
self._cancelled_message = exc.args[0] if exc.args else None
super().cancel(self._cancel_message)
except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
raise
except BaseException as exc:
super().set_exception(exc)
else:
# The coroutine yielded a Future-like object.
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
result._asyncio_future_blocking = False
result.add_done_callback(self.__step_run_and_handle_result)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel(msg=self._cancel_message):
self._must_cancel = False
elif result is None:
self._loop.call_soon(self.__step)
else:
self._loop.call_soon(self.__step,
context=self._context)
finally:
_leave_task(self._loop, self)
self = None

__step is the coroutine driver. It calls coro.send(None) to advance the coroutine. If the coroutine raises StopIteration, the task is resolved with the exception's value (the return value of the async def). If it raises CancelledError, the task is cancelled. Any other exception sets the task's exception state.

When the coroutine yields (the else branch), the yielded object is inspected for _asyncio_future_blocking. A Future sets this flag to True before yielding itself with yield self. __step clears the flag, registers __step_run_and_handle_result as a done callback on the future, and stores the future in _fut_waiter so that Task.cancel can propagate a cancellation into it. When the future completes, the callback fires and calls __step again with the result or exception.

Future result/exception protocol (futures.py lines 1 to 200)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/futures.py#L1-200

class Future:
_state = _PENDING
_result = None
_exception = None
_exception_tb_future = None

def result(self):
if self._state == _CANCELLED:
exc = self._make_cancelled_error()
raise exc
if self._state != _FINISHED:
raise InvalidStateError('Result is not ready.')
self.__log_traceback = False
if self._exception is not None:
raise self._exception.with_traceback(self._exception_tb)
return self._result

def set_result(self, result):
if self._state != _PENDING:
raise InvalidStateError(f'{self._state}: {self!r}')
self._result = result
self._state = _FINISHED
self.__schedule_callbacks()

def add_done_callback(self, fn, *, context=None):
if self._state != _PENDING:
self._loop.call_soon(fn, self, context=context)
else:
if context is None:
context = contextvars.copy_context()
self._callbacks.append((fn, context))

Future is a three-state machine: _PENDING, _CANCELLED, _FINISHED. set_result and set_exception both transition from _PENDING to _FINISHED and call __schedule_callbacks, which posts all registered callbacks to the loop with call_soon. add_done_callback either enqueues the callback if the future is still pending, or schedules it immediately if the future has already resolved. Each callback carries a contextvars.Context snapshot so that ContextVar values are isolated per-task.

gopy mirror

module/asyncio/ is pending. The event loop requires a platform I/O selector (wrapping epoll/kqueue/select). Task.__step and Future's state machine are pure logic and can be ported as Go structs. The coroutine protocol (send/throw/close on a generator object) maps to gopy's existing generator machinery. _asyncio C acceleration can be skipped initially; the pure-Python classes in tasks.py and futures.py are behavioral references.