Skip to main content

Lib/asyncio/tasks.py

Source:

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/tasks.py

tasks.py builds the coroutine driver on top of Future. Task is the only concrete scheduler unit in asyncio: every await chain bottoms out in a Task.__step call queued on the event loop's _ready deque. The module also provides gather, shield, wait, current_task, and all_tasks.

Map

LinesSymbolRole
1-40module prologueimports, current_task, all_tasks weak-set
41-120Task.__init__calls call_soon(__step), registers in _all_tasks
121-200Task.cancelsets _must_cancel, injects CancelledError into live coroutine
201-310Task.__step, Task.__step_run_and_handle_resultsend/throw into coroutine, handle StopIteration and future yields
311-370Task.__wakeupdone-callback added to awaited futures, calls __step
371-430ensure_futurecoerces coroutines and awaitables to Task or Future
431-540gather, _GatheringFuturefan-out over an iterable, returns aggregate future
541-640wait, wait_forFIRST_COMPLETED / FIRST_EXCEPTION / ALL_DONE strategies
641-700shieldwraps a future to intercept cancel calls

Reading

Task.__init__ and the first step

Task.__init__ stores the coroutine and immediately enqueues __step via call_soon. The coroutine does not run during __init__; it runs on the next loop iteration. This ordering guarantee is load-bearing: code after create_task(coro) runs before the coroutine body starts.

# CPython: Lib/asyncio/tasks.py:98 Task.__init__
def __init__(self, coro, *, loop=None, name=None, context=None,
eager_start=False):
super().__init__(loop=loop)
...
self.__coro = coro
self.__fut_waiter = None
self.__must_cancel = False
self.__log_traceback = False
self._loop.call_soon(self.__step, context=self._context)
_register_task(self)

Task.__step: the coroutine driver

__step is the most important method in asyncio. It calls coro.send(None) (or coro.throw(exc) when cancelling). Three outcomes are possible: the coroutine yields a Future (suspension), raises StopIteration (normal completion), or raises any other exception (error completion).

When the coroutine suspends by yielding a future, __step adds __wakeup as a done-callback on that future. When the future completes, __wakeup calls __step again, and coro.send(None) resumes the coroutine from the yield point with the future's result already available.

# CPython: Lib/asyncio/tasks.py:230 Task.__step_run_and_handle_result
def __step_run_and_handle_result(self, exc):
coro = self.__coro
self._fut_waiter = None
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
if self.__must_cancel:
self.__must_cancel = False
super().cancel(self.__cancel_message)
else:
super().set_result(exc.value)
except exceptions.CancelledError as exc:
super().cancel(msg=exc.args[0] if exc.args else None)
except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
raise
except BaseException as exc:
super().set_exception(exc)
else:
# The coroutine yielded a future; register wakeup.
result.add_done_callback(self.__wakeup)
self.__fut_waiter = result

gather with _GatheringFuture

gather wraps each argument in a Task (if not already a future), then returns a single outer _GatheringFuture that resolves to a list of results in the original argument order, regardless of completion order. The inner _done_callback increments a counter; when the counter reaches the child count, it collects results in original order and calls outer.set_result.

# CPython: Lib/asyncio/tasks.py:478 gather
def gather(*coros_or_futures, return_exceptions=False):
children = []
nchildren = 0
outer = loop.create_future()
...
def _done_callback(fut):
nonlocal nfinished
nfinished += 1
if not return_exceptions and fut.cancelled():
outer.cancel(msg=fut._cancel_message)
return
if nfinished == nchildren:
results = []
for fut in children:
...
results.append(fut.result())
outer.set_result(results)
for arg in set(coros_or_futures):
task = ensure_future(arg, loop=loop)
task.add_done_callback(_done_callback)
children.append(task)
nchildren = len(children)

_GatheringFuture overrides cancel so that cancelling the outer future also cancels all children that have not finished yet.

shield: cancellation firewall

shield(aw) returns a new future that mirrors the inner future's result but absorbs cancel() calls. If the outer future is cancelled, the inner task keeps running. If the inner task finishes, the outer future is resolved unless it was already cancelled.

# CPython: Lib/asyncio/tasks.py:689 shield
def shield(aw):
inner = ensure_future(aw)
if inner.done():
return inner
outer = inner._loop.create_future()

def _inner_done_callback(inner):
if outer.cancelled():
if not inner.cancelled():
inner.exception() # suppress logged warning
return
if inner.cancelled():
outer.cancel()
else:
exc = inner.exception()
if exc is not None:
outer.set_exception(exc)
else:
outer.set_result(inner.result())

def _outer_done_callback(outer):
if not inner.done():
inner.remove_done_callback(_inner_done_callback)

inner.add_done_callback(_inner_done_callback)
outer.add_done_callback(_outer_done_callback)
return outer

current_task and all_tasks

current_task returns the Task currently executing on the running loop by reading a thread-local set by Task.__step. all_tasks returns a copy of _all_tasks, a module-level WeakSet that every Task.__init__ registers into and every task finalizer removes from.

# CPython: Lib/asyncio/tasks.py:28 current_task
def current_task(loop=None):
if loop is None:
loop = events.get_running_loop()
return _get_running_loop_task(loop)

# CPython: Lib/asyncio/tasks.py:36 all_tasks
def all_tasks(loop=None):
if loop is None:
loop = events.get_running_loop()
return {t for t in list(_all_tasks)
if futures._get_loop(t) is loop and not t.done()}

gopy notes

Status: not yet ported.

Planned package path: module/asyncio/.

Task.__step is the central porting target. The Go equivalent calls coro.Resume() and switches on the yielded value type. __fut_waiter must be cleared at the top of each step before calling send or throw, matching CPython exactly. _must_cancel and __cancel_message pair together and both fields belong on the task struct. gather and wait create a _GatheringFuture subclass in CPython; the Go port can use a plain struct embedding Future with a counter field. _all_tasks is a WeakSet; the Go equivalent is a map from task pointer to struct protected by a mutex, with explicit deregistration in the finalizer.