Lib/asyncio/windows_events.py
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/windows_events.py
This file provides the Windows proactor event loop backed by I/O Completion Ports. The central class is IocpProactor, which wraps CreateIoCompletionPort and GetQueuedCompletionStatus to drive all async I/O. _OverlappedFuture ties each in-flight OVERLAPPED operation to a Python Future. Named-pipe servers are supported through PipeServer and ProactorEventLoop.start_serving_pipe. The default policy on Windows resolves to _WindowsProactorEventLoopPolicy.
Map
| Lines | Symbol | Role |
|---|---|---|
| 49-95 | _OverlappedFuture | Future subclass that holds a reference to a _overlapped.Overlapped struct; cancellation calls ov.cancel() to abort the pending Windows I/O operation |
| 97-169 | _BaseWaitHandleFuture | Base for futures backed by RegisterWaitForSingleObject; handles unregistration via UnregisterWait / UnregisterWaitEx |
| 171-193 | _WaitCancelFuture | Sentinel future used to synchronize cancellation of a _WaitHandleFuture; must never be cancelled itself |
| 195-244 | _WaitHandleFuture | Full wait-handle future; creates a Win32 event and uses UnregisterWaitEx to ensure clean teardown before the OVERLAPPED memory is freed |
| 247-303 | PipeServer | Bound named-pipe server (analogous to a listening socket); manages a chain of pipe instances via CreateNamedPipe so there is always one free handle for incoming clients |
| 306-308 | _WindowsSelectorEventLoop | Thin subclass of BaseSelectorEventLoop for Windows; no added methods |
| 310-414 | ProactorEventLoop | IOCP-backed event loop; creates an IocpProactor by default; adds create_pipe_connection, start_serving_pipe, and _make_subprocess_transport |
| 417-873 | IocpProactor | Core proactor; owns the IOCP handle; implements recv, send, accept, connect, sendfile, accept_pipe, connect_pipe, wait_for_handle; drives all I/O in _poll |
| 444-453 | IocpProactor.select | Public polling interface; delegates to _poll, swaps out _results, and returns ready futures to the event loop |
| 762-817 | IocpProactor._poll | Calls GetQueuedCompletionStatus in a loop with a millisecond timeout; dispatches completions to their registered callbacks; drains _unregistered at the end |
| 705-714 | IocpProactor._register_with_iocp | Associates a socket or pipe handle with the IOCP handle via a second CreateIoCompletionPort call |
| 894-903 | _WindowsProactorEventLoopPolicy | Default Windows policy; sets _loop_factory = ProactorEventLoop; exported as _DefaultEventLoopPolicy and EventLoop |
Reading
IocpProactor construction and IOCP handle creation
The proactor creates the completion port at __init__ time with INVALID_HANDLE_VALUE as the file handle, meaning it starts as a standalone port not yet associated with any I/O object. Handles are attached later in _register_with_iocp.
# CPython: Lib/asyncio/windows_events.py:420 IocpProactor.__init__
def __init__(self, concurrency=INFINITE):
self._loop = None
self._results = []
self._iocp = _overlapped.CreateIoCompletionPort(
_overlapped.INVALID_HANDLE_VALUE, NULL, 0, concurrency)
self._cache = {}
self._registered = weakref.WeakSet()
self._unregistered = []
self._stopped_serving = weakref.WeakSet()
IocpProactor._poll: the main completion loop
_poll converts the Python timeout to milliseconds (rounding up to avoid premature return), then spins on GetQueuedCompletionStatus until it returns None (timeout or empty). Each status tuple carries an address that keys into _cache to retrieve the future, OVERLAPPED, target object, and finish callback.
# CPython: Lib/asyncio/windows_events.py:762 IocpProactor._poll
def _poll(self, timeout=None):
if timeout is None:
ms = INFINITE
elif timeout < 0:
raise ValueError("negative timeout")
else:
ms = math.ceil(timeout * 1e3)
if ms >= INFINITE:
raise ValueError("timeout too big")
while True:
status = _overlapped.GetQueuedCompletionStatus(self._iocp, ms)
if status is None:
break
ms = 0
err, transferred, key, address = status
try:
f, ov, obj, callback = self._cache.pop(address)
except KeyError:
if self._loop.get_debug():
self._loop.call_exception_handler({
'message': ('GetQueuedCompletionStatus() returned an '
'unexpected event'),
'status': ('err=%s transferred=%s key=%#x address=%#x'
% (err, transferred, key, address)),
})
if key not in (0, _overlapped.INVALID_HANDLE_VALUE):
_winapi.CloseHandle(key)
continue
if obj in self._stopped_serving:
f.cancel()
elif not f.done():
try:
value = callback(transferred, key, ov)
except OSError as e:
f.set_exception(e)
self._results.append(f)
else:
f.set_result(value)
self._results.append(f)
finally:
f = None
for ov in self._unregistered:
self._cache.pop(ov.address, None)
self._unregistered.clear()
_OverlappedFuture cancellation
When a coroutine is cancelled, _OverlappedFuture.cancel calls ov.cancel() on the underlying Windows OVERLAPPED before delegating to the base Future.cancel. This ensures the kernel I/O operation is aborted, not just the Python future.
# CPython: Lib/asyncio/windows_events.py:68 _OverlappedFuture._cancel_overlapped
def _cancel_overlapped(self):
if self._ov is None:
return
try:
self._ov.cancel()
except OSError as exc:
context = {
'message': 'Cancelling an overlapped future failed',
'exception': exc,
'future': self,
}
if self._source_traceback:
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
self._ov = None
# CPython: Lib/asyncio/windows_events.py:84 _OverlappedFuture.cancel
def cancel(self, msg=None):
self._cancel_overlapped()
return super().cancel(msg=msg)
Named-pipe server: PipeServer and start_serving_pipe
PipeServer keeps a free CreateNamedPipe handle ready so connecting clients never see ERROR_FILE_NOT_FOUND. start_serving_pipe loops via loop_accept_pipe, calling accept_pipe (which issues ConnectNamedPipe as an overlapped operation) and immediately obtaining a fresh handle for the next client.
# CPython: Lib/asyncio/windows_events.py:270 PipeServer._server_pipe_handle
def _server_pipe_handle(self, first):
if self.closed():
return None
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
if first:
flags |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE
h = _winapi.CreateNamedPipe(
self._address, flags,
_winapi.PIPE_TYPE_MESSAGE | _winapi.PIPE_READMODE_MESSAGE |
_winapi.PIPE_WAIT,
_winapi.PIPE_UNLIMITED_INSTANCES,
windows_utils.BUFSIZE, windows_utils.BUFSIZE,
_winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL)
pipe = windows_utils.PipeHandle(h)
self._free_instances.add(pipe)
return pipe
gopy notes
A Go port of this file centers on golang.org/x/sys/windows for IOCP primitives (CreateIoCompletionPort, GetQueuedCompletionStatusEx). _OverlappedFuture maps to a struct holding a *windows.Overlapped and a channel or callback for completion. IocpProactor._poll becomes a goroutine or a method called from the run-loop tick. Named-pipe support uses windows.CreateNamedPipe with the same FILE_FLAG_OVERLAPPED flag. The selector-based _WindowsSelectorEventLoop has no practical value in gopy and would be omitted; ProactorEventLoop and _WindowsProactorEventLoopPolicy are the only targets.