Lib/asyncio/tasks.py (part 8)
Source:
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/tasks.py
This annotation covers task scheduling and composition. See lib_asyncio7_detail for BaseEventLoop, run_until_complete, and call_soon.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1-80 | Task.__init__ | Schedule coroutine as a task |
| 81-180 | Task.__step | Advance the coroutine one step |
| 181-280 | ensure_future | Wrap a coroutine or future |
| 281-380 | gather | Run awaitables concurrently |
| 381-500 | shield | Protect a coroutine from cancellation |
Reading
Task.__init__
# CPython: Lib/asyncio/tasks.py:92 Task.__init__
def __init__(self, coro, *, loop=None, name=None, context=None, eager_start=False):
super().__init__(loop=loop)
self._coro = coro
self._context = context or contextvars.copy_context()
self._loop.call_soon(self.__step, context=self._context)
if eager_start and self._loop.is_running():
self.__step()
asyncio.create_task(coro) schedules Task.__step via call_soon. The task's context (a copy of the current contextvars context) is passed so that context variables set before create_task are visible inside the coroutine. eager_start=True runs the first step synchronously.
Task.__step
# CPython: Lib/asyncio/tasks.py:220 __step
def __step(self, exc=None):
coro = self._coro
self._fut_waiter = None
self.__class__._current_tasks[self._loop] = self
try:
if exc is None:
result = coro.send(None)
else:
result = coro.throw(type(exc), exc, exc.__traceback__)
except StopIteration as exc:
super().set_result(exc.value)
except CancelledError as exc:
super().cancel(msg=exc.args[0] if exc.args else None)
except (KeyboardInterrupt, SystemExit) as exc:
super().set_exception(exc)
raise
except BaseException as exc:
super().set_exception(exc)
else:
# result is a Future-like (yielded from inside an await expression)
if isinstance(result, futures.Future):
result.add_done_callback(self.__step_run_and_handle_result)
self._fut_waiter = result
else:
self._loop.call_soon(self.__step)
finally:
self.__class__._current_tasks.pop(self._loop, None)
self = None # prevent reference cycle
__step drives the coroutine forward by calling send(None). When the coroutine yields a Future, __step attaches itself as a done callback so it will resume when the future completes. StopIteration means the coroutine returned — the task's result is set.
gather
# CPython: Lib/asyncio/tasks.py:720 gather
async def gather(*coros_or_futures, return_exceptions=False):
children = []
for arg in set(coros_or_futures):
task = ensure_future(arg, loop=loop)
children.append(task)
outer = loop.create_future()
nfinished = 0
results = [None] * len(children)
def _done_callback(i, fut):
nonlocal nfinished
nfinished += 1
if not return_exceptions and fut.exception() is not None:
if not outer.done():
outer.set_exception(fut.exception())
return
results[i] = fut.result() if not fut.exception() else fut.exception()
if nfinished == len(children):
outer.set_result(results)
for i, child in enumerate(children):
child.add_done_callback(partial(_done_callback, i))
return await outer
asyncio.gather(coro1, coro2, coro3) runs all three concurrently and returns a list of results when all complete. By default, the first exception cancels all others and re-raises. return_exceptions=True treats exceptions as results.
gopy notes
asyncio.Task is module/asyncio.Task in module/asyncio/tasks.go. __step drives the coroutine via vm.GenSend. The _current_tasks dict is a goroutine-local map. gather is module/asyncio.Gather; uses Go channels to coordinate completions. ensure_future wraps a vm.Coroutine in an objects.Task.