Lib/asyncio/ (part 3)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/tasks.py
This annotation covers the task API. See modules_asyncio2_detail for EventLoop, Future, Handle, and the selector event loop internals.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | Task.__init__ | Wrap a coroutine; schedule __step |
| 81-200 | Task.__step | Drive the coroutine one send() cycle |
| 201-300 | ensure_future | Convert coroutine, future, or awaitable to a Task |
| 301-420 | gather | Run multiple awaitables concurrently; collect results |
| 421-600 | wait / shield | Timeout-aware waiting; cancel-safe wrapper |
Reading
Task.__init__
# CPython: Lib/asyncio/tasks.py:100 Task.__init__
class Task(futures._PyFuture):
def __init__(self, coro, *, loop=None, name=None, context=None,
eager_start=False):
super().__init__(loop=loop)
self._coro = coro
self._context = context or contextvars.copy_context()
self._loop.call_soon(self.__step, context=self._context)
if eager_start:
self.__step() # Run first step synchronously
Task is a Future that drives a coroutine. call_soon(self.__step) schedules the first step. eager_start=True (Python 3.12+) runs the first step immediately without yielding to the event loop, avoiding one round-trip through call_soon.
Task.__step
# CPython: Lib/asyncio/tasks.py:180 Task.__step
def __step(self, exc=None):
coro = self._coro
self.__fut_waiter = None
_enter_task(self._loop, self)
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
self.set_result(exc.value)
except CancelledError as exc:
super().cancel(msg=exc.args[0] if exc.args else None)
except Exception as exc:
self.set_exception(exc)
else:
# The coroutine yielded a Future (via await)
if isinstance(result, futures.Future):
result.add_done_callback(self.__step)
self.__fut_waiter = result
else:
self._loop.call_soon(self.__step)
finally:
_leave_task(self._loop, self)
self = None
__step is the coroutine driver. Each await future causes the coroutine to yield future; __step receives it and adds itself as a done callback on that future. When the future completes, __step is called again with the result.
gather
# CPython: Lib/asyncio/tasks.py:380 gather
async def gather(*coros_or_futures, return_exceptions=False):
"""Return a future aggregating results from awaitables."""
children = []
for arg in set(coros_or_futures):
if not isfuture(arg):
arg = ensure_future(arg, loop=loop)
children.append(arg)
outer = loop.create_future()
nfinished = 0
results = [None] * len(children)
def _done_callback(i, future):
nonlocal nfinished
if future.cancelled():
outer.cancel()
return
exc = future.exception()
if exc is not None and not return_exceptions:
outer.set_exception(exc)
for child in children:
child.cancel()
return
results[i] = future.exception() if return_exceptions and exc else future.result()
nfinished += 1
if nfinished == len(children):
outer.set_result(results)
for i, child in enumerate(children):
child.add_done_callback(functools.partial(_done_callback, i))
return await outer
gather starts all tasks, then waits for all of them. return_exceptions=True collects exceptions as results instead of propagating the first one. If any task is cancelled with return_exceptions=False, the outer future is also cancelled.
shield
# CPython: Lib/asyncio/tasks.py:720 shield
async def shield(arg):
"""Wait for the future, shielding it from cancellation."""
inner = ensure_future(arg)
outer = loop.create_future()
def _inner_done_callback(inner):
if outer.cancelled():
inner.cancel() # If outer cancelled, propagate to inner
return
if inner.cancelled():
outer.cancel()
else:
exc = inner.exception()
if exc is not None:
outer.set_exception(exc)
else:
outer.set_result(inner.result())
def _outer_done_callback(outer):
inner.remove_done_callback(_inner_done_callback)
inner.add_done_callback(_inner_done_callback)
outer.add_done_callback(_outer_done_callback)
return await outer
await shield(task) can be cancelled without cancelling the underlying task. Cancellation of the outer future disconnects from the inner one. If the outer is cancelled before the inner completes, the inner continues running independently.
gopy notes
Task is module/asyncio.Task in module/asyncio/module.go. Task.__step calls objects.CoroutineSend. gather creates module/asyncio.GatherFuture. shield creates a wrapper future backed by a Go channel. The event loop is module/asyncio.EventLoop using Go's net/http and select primitives.