Skip to main content

Lib/asyncio/ (part 2)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/tasks.py

This annotation covers task and future management. See lib_asyncio_detail for the event loop, BaseEventLoop, call_soon, call_later, and transport/protocol.

Map

LinesSymbolRole
1-120Task.__init__Wrap a coroutine in a Task; schedule first __step
121-280Task.__stepDrive the coroutine one step; handle Future waits
281-420ensure_futureWrap coroutine/future in a Task or return as-is
421-580gatherWait for multiple awaitables; aggregate results
581-700waitWait for a set of futures with timeout/condition
701-820shieldProtect an awaitable from cancellation
821-1000timeout / timeout_atContext manager raising TimeoutError

Reading

Task.__init__

# CPython: Lib/asyncio/tasks.py:90 Task.__init__
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None, name=None, context=None,
eager_start=False):
super().__init__(loop=loop)
if context is None:
self._context = contextvars.copy_context()
else:
self._context = context
self._coro = coroutine
self._fut_waiter = None
self._must_cancel = False
self._log_destroy_pending = True
self.__loop.call_soon(self.__step, context=self._context)

Task immediately schedules __step via call_soon. Each task has its own contextvars.Context copy so task-local variables are isolated.

Task.__step

# CPython: Lib/asyncio/tasks.py:230 Task.__step
def __step(self, exc=None):
coro = self._coro
self.__fut_waiter = None
_enter_task(self.__loop, self)
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(type(exc), exc, exc.__traceback__)
except StopIteration as exc:
if self._must_cancel:
super().cancel(self._cancel_message)
else:
super().set_result(exc.value)
except CancelledError as exc:
super().cancel(message=exc.args[0] if exc.args else None)
except Exception as exc:
super().set_exception(exc)
else:
# result is a Future or None
if isinstance(result, futures.Future):
result.add_done_callback(self.__step_run_and_handle_result)
self.__fut_waiter = result
elif result is None:
self.__loop.call_soon(self.__step, context=self._context)
finally:
_leave_task(self.__loop, self)

__step calls coro.send(None) to advance the coroutine. When the coroutine awaits a Future, send returns that Future. __step then adds itself as a done callback so it will be called when the future completes.

gather

# CPython: Lib/asyncio/tasks.py:540 gather
async def gather(*coros_or_futures, return_exceptions=False):
"""Wait for multiple awaitables, collect results into a list."""
children = []
for arg in set(coros_or_futures):
task = ensure_future(arg, loop=loop)
children.append(task)
results = await asyncio.wait(children, return_when=ALL_COMPLETED)
if return_exceptions:
return [t.result() if not t.cancelled() else
CancelledError() for t in children]
# Raise the first exception
for task in children:
exc = task.exception()
if exc is not None:
raise exc
return [task.result() for task in children]

gather returns results in the same order as inputs, even though tasks may complete in any order. If return_exceptions=True, exceptions are returned as values rather than re-raised.

timeout

# CPython: Lib/asyncio/timeouts.py:80 timeout
class Timeout:
"""Async context manager raising TimeoutError after a deadline."""
def __init__(self, when):
self._when = when
self._state = _State.CREATED
self._task: Task | None = None
self._timeout_handler: asyncio.Handle | None = None

async def __aenter__(self):
self._task = asyncio.current_task()
self._timeout_handler = self._task.get_loop().call_at(
self._when, self._on_timeout, self._task)
return self

def _on_timeout(self, task):
task.cancel(msg='Timeout')

async with asyncio.timeout(5): cancels the current task after 5 seconds. The cancellation is converted to TimeoutError on __aexit__. timeout_at takes an absolute loop time.

gopy notes

asyncio.Task is module/asyncio.Task in module/asyncio/task.go. Task.__step calls objects.CoroSend. gather is a pure Python function over asyncio.wait. timeout uses loop.CallAt in module/asyncio/event_loop.go.