Skip to main content

Lib/asyncio/ (part 2)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/tasks.py

This annotation covers high-level task coordination. See lib_asyncio2_detail for the event loop, run, sleep, and coroutine mechanics.

Map

LinesSymbolRole
1-100asyncio.gatherRun coroutines concurrently; collect results
101-220asyncio.waitWait for a set of tasks with FIRST_COMPLETED / ALL_COMPLETED
221-360asyncio.create_taskSchedule a coroutine as a Task
361-480Task.cancelRequest cancellation via CancelledError injection
481-700asyncio.timeoutContext manager that cancels on deadline

Reading

asyncio.gather

# CPython: Lib/asyncio/tasks.py:780 gather
async def gather(*coros_or_futures, return_exceptions=False):
"""Run awaitables concurrently. Returns a list of results."""
children = []
done = 0
nfuts = 0
outer = get_event_loop().create_future()
...
for arg in set(coros_or_futures):
task = ensure_future(arg, loop=loop)
children.append(task)
task.add_done_callback(_done_callback)
...
return await outer

asyncio.gather(a(), b(), c()) returns a list [result_a, result_b, result_c] in the same order as the arguments. If any coroutine raises, gather cancels the remaining ones (unless return_exceptions=True).

asyncio.wait

# CPython: Lib/asyncio/tasks.py:440 wait
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
"""Wait for futures; return (done, pending) sets."""
fs = set(fs)
done, pending = set(), set()
...
waiter = loop.create_future()
counter = len(fs)
def _on_completion(f):
nonlocal counter
counter -= 1
done.add(f)
pending.discard(f)
if (return_when == FIRST_COMPLETED or counter == 0):
waiter.set_result(None)
for f in fs:
f.add_done_callback(_on_completion)
...
return done, pending

asyncio.wait does not collect results; it returns (done, pending) sets. FIRST_COMPLETED returns as soon as one task finishes. FIRST_EXCEPTION returns when any task raises (or all finish).

Task.cancel

# CPython: Lib/asyncio/tasks.py:280 Task.cancel
def cancel(self, msg=None):
"""Request cancellation of this task."""
self._log_traceback = False
if self.done():
return False
if self._fut_waiter is not None:
if self._fut_waiter.cancel(msg=msg):
return True
self._must_cancel = True
self._cancel_message = msg
return True

task.cancel() schedules a CancelledError to be thrown into the coroutine at its next await point. The task may catch CancelledError in a try/finally to run cleanup.

asyncio.timeout

# CPython: Lib/asyncio/timeouts.py:80 timeout
@contextmanager
def timeout(delay):
"""Context manager that cancels the current task after delay seconds."""
loop = get_event_loop()
task = current_task()
handle = loop.call_later(delay, task.cancel)
try:
yield
except CancelledError as exc:
if handle.cancelled():
raise TimeoutError from exc
raise
finally:
handle.cancel()

async with asyncio.timeout(5.0): cancels the current task if it doesn't complete within 5 seconds. The TimeoutError is raised instead of CancelledError so callers can distinguish timeouts from explicit cancellation.

gopy notes

asyncio.gather is module/asyncio.Gather in module/asyncio/tasks.go. Tasks are goroutines; gather uses a sync.WaitGroup. asyncio.wait uses Go channels for FIRST_COMPLETED. Task.cancel sends a CancelledError to the coroutine's goroutine via a channel. asyncio.timeout uses time.AfterFunc.