Skip to main content

Lib/asyncio/ (part 3)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/tasks.py

This annotation covers the task API. See modules_asyncio2_detail for EventLoop, Future, Handle, and the selector event loop internals.

Map

LinesSymbolRole
1-80Task.__init__Wrap a coroutine; schedule __step
81-200Task.__stepDrive the coroutine one send() cycle
201-300ensure_futureConvert coroutine, future, or awaitable to a Task
301-420gatherRun multiple awaitables concurrently; collect results
421-600wait / shieldTimeout-aware waiting; cancel-safe wrapper

Reading

Task.__init__

# CPython: Lib/asyncio/tasks.py:100 Task.__init__
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None, name=None, context=None,
eager_start=False):
super().__init__(loop=loop)
self._coro = coro
self._context = context or contextvars.copy_context()
self._loop.call_soon(self.__step, context=self._context)
if eager_start:
self.__step() # Run first step synchronously

Task is a Future that drives a coroutine. call_soon(self.__step) schedules the first step. eager_start=True (Python 3.12+) runs the first step immediately without yielding to the event loop, avoiding one round-trip through call_soon.

Task.__step

# CPython: Lib/asyncio/tasks.py:180 Task.__step
def __step(self, exc=None):
coro = self._coro
self.__fut_waiter = None
_enter_task(self._loop, self)
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
self.set_result(exc.value)
except CancelledError as exc:
super().cancel(msg=exc.args[0] if exc.args else None)
except Exception as exc:
self.set_exception(exc)
else:
# The coroutine yielded a Future (via await)
if isinstance(result, futures.Future):
result.add_done_callback(self.__step)
self.__fut_waiter = result
else:
self._loop.call_soon(self.__step)
finally:
_leave_task(self._loop, self)
self = None

__step is the coroutine driver. Each await future causes the coroutine to yield future; __step receives it and adds itself as a done callback on that future. When the future completes, __step is called again with the result.

gather

# CPython: Lib/asyncio/tasks.py:380 gather
async def gather(*coros_or_futures, return_exceptions=False):
"""Return a future aggregating results from awaitables."""
children = []
for arg in set(coros_or_futures):
if not isfuture(arg):
arg = ensure_future(arg, loop=loop)
children.append(arg)
outer = loop.create_future()
nfinished = 0
results = [None] * len(children)

def _done_callback(i, future):
nonlocal nfinished
if future.cancelled():
outer.cancel()
return
exc = future.exception()
if exc is not None and not return_exceptions:
outer.set_exception(exc)
for child in children:
child.cancel()
return
results[i] = future.exception() if return_exceptions and exc else future.result()
nfinished += 1
if nfinished == len(children):
outer.set_result(results)

for i, child in enumerate(children):
child.add_done_callback(functools.partial(_done_callback, i))
return await outer

gather starts all tasks, then waits for all of them. return_exceptions=True collects exceptions as results instead of propagating the first one. If any task is cancelled with return_exceptions=False, the outer future is also cancelled.

shield

# CPython: Lib/asyncio/tasks.py:720 shield
async def shield(arg):
"""Wait for the future, shielding it from cancellation."""
inner = ensure_future(arg)
outer = loop.create_future()

def _inner_done_callback(inner):
if outer.cancelled():
inner.cancel() # If outer cancelled, propagate to inner
return
if inner.cancelled():
outer.cancel()
else:
exc = inner.exception()
if exc is not None:
outer.set_exception(exc)
else:
outer.set_result(inner.result())

def _outer_done_callback(outer):
inner.remove_done_callback(_inner_done_callback)

inner.add_done_callback(_inner_done_callback)
outer.add_done_callback(_outer_done_callback)
return await outer

await shield(task) can be cancelled without cancelling the underlying task. Cancellation of the outer future disconnects from the inner one. If the outer is cancelled before the inner completes, the inner continues running independently.

gopy notes

Task is module/asyncio.Task in module/asyncio/module.go. Task.__step calls objects.CoroutineSend. gather creates module/asyncio.GatherFuture. shield creates a wrapper future backed by a Go channel. The event loop is module/asyncio.EventLoop using Go's net/http and select primitives.