asyncio/streams.py
streams.py is the high-level byte-stream layer in asyncio. It wraps the low-level transport/protocol machinery behind two friendly classes: StreamReader (consumer side) and StreamWriter (producer side). User code calls open_connection() and gets back a (reader, writer) pair without ever touching a protocol object directly.
Map
| Lines | Symbol | Role |
|---|---|---|
| 1–60 | module top | imports, __all__, FlowControl helper |
| 61–120 | open_connection() | creates TCP socket, wires reader/writer |
| 121–160 | start_server() | accept-loop factory |
| 161–240 | StreamReaderProtocol | internal Protocol that feeds StreamReader |
| 241–400 | StreamReader | buffered consumer: read, readline, readexactly |
| 401–530 | StreamWriter | producer: write, writelines, drain, close |
| 531–600 | _wakeup_waiter() | resolves the paused coroutine |
| 601–700 | Unix socket helpers | open_unix_connection, start_unix_server |
Reading
open_connection bootstraps the pair
open_connection is the main entry point. It calls loop.create_connection with a freshly constructed StreamReaderProtocol, then wraps the resulting transport in a StreamWriter.
# CPython: Lib/asyncio/streams.py:61 open_connection
async def open_connection(host=None, port=None, *,
limit=_DEFAULT_LIMIT, **kwds):
loop = events.get_running_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer
The limit parameter controls the high-water mark for back-pressure. Once the internal buffer exceeds it, pause_reading is called on the transport.
StreamReader.read waits for data
read(n) returns up to n bytes. If the buffer is empty and EOF has not arrived, it suspends the caller by storing a Future in _waiter and awaiting it.
# CPython: Lib/asyncio/streams.py:279 StreamReader.read
async def read(self, n=-1):
if n == 0:
return b''
if n < 0:
# read until EOF
...
if not self._buffer and not self._eof:
await self._wait_for_data('read')
...
data = bytes(self._buffer[:n])
del self._buffer[:n]
return data
_wakeup_waiter resolves the suspended coroutine
Every time the protocol delivers new data or signals EOF, it calls _wakeup_waiter. The method sets the stored Future's result, which resumes the await inside read or readline.
# CPython: Lib/asyncio/streams.py:531 StreamReader._wakeup_waiter
def _wakeup_waiter(self):
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.done():
waiter.set_result(None)
StreamWriter.drain applies back-pressure
drain() yields control back to the event loop until the transport's write buffer drops below the low-water mark. The protocol's pause_writing / resume_writing callbacks toggle an internal Event that drain waits on.
# CPython: Lib/asyncio/streams.py:447 StreamWriter.drain
async def drain(self):
if self._transport.is_closing():
await asyncio.sleep(0)
exc = self._protocol._get_close_waiter(self._writer)
...
await self._protocol._drain_helper()
gopy notes
StreamReader._bufferis abytearray, allowing efficient prefix deletion withdel buf[:n].- The
_waiterpattern (a singleFuturestored per reader) means only one coroutine may wait on a givenStreamReaderat a time. A second concurrentreadraisesRuntimeError. FlowControl._drain_helperuses a plainasyncio.Event, not aFuture, so multiple writers can allawait drain()simultaneously.open_connectionusesloop.create_connection; the Unix variant usesloop.create_unix_connection. Both paths return the same(reader, writer)shape.
CPython 3.14 changes
- The deprecated
loopparameter was removed fromopen_connection,start_server, and theStream*constructors (deprecation began in 3.8, enforced in 3.10, parameter dropped in 3.14). StreamWritergained__aenter__/__aexit__for use as an async context manager, so callers can rely onclose()being awaited automatically.- Internal use of
asyncio.coroutine(removed in 3.11) is gone; all paths useasync def.