Skip to main content

asyncio/streams.py

streams.py is the high-level byte-stream layer in asyncio. It wraps the low-level transport/protocol machinery behind two friendly classes: StreamReader (consumer side) and StreamWriter (producer side). User code calls open_connection() and gets back a (reader, writer) pair without ever touching a protocol object directly.

Map

LinesSymbolRole
1–60module topimports, __all__, FlowControl helper
61–120open_connection()creates TCP socket, wires reader/writer
121–160start_server()accept-loop factory
161–240StreamReaderProtocolinternal Protocol that feeds StreamReader
241–400StreamReaderbuffered consumer: read, readline, readexactly
401–530StreamWriterproducer: write, writelines, drain, close
531–600_wakeup_waiter()resolves the paused coroutine
601–700Unix socket helpersopen_unix_connection, start_unix_server

Reading

open_connection bootstraps the pair

open_connection is the main entry point. It calls loop.create_connection with a freshly constructed StreamReaderProtocol, then wraps the resulting transport in a StreamWriter.

# CPython: Lib/asyncio/streams.py:61 open_connection
async def open_connection(host=None, port=None, *,
limit=_DEFAULT_LIMIT, **kwds):
loop = events.get_running_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
lambda: protocol, host, port, **kwds)
writer = StreamWriter(transport, protocol, reader, loop)
return reader, writer

The limit parameter controls the high-water mark for back-pressure. Once the internal buffer exceeds it, pause_reading is called on the transport.

StreamReader.read waits for data

read(n) returns up to n bytes. If the buffer is empty and EOF has not arrived, it suspends the caller by storing a Future in _waiter and awaiting it.

# CPython: Lib/asyncio/streams.py:279 StreamReader.read
async def read(self, n=-1):
if n == 0:
return b''
if n < 0:
# read until EOF
...
if not self._buffer and not self._eof:
await self._wait_for_data('read')
...
data = bytes(self._buffer[:n])
del self._buffer[:n]
return data

_wakeup_waiter resolves the suspended coroutine

Every time the protocol delivers new data or signals EOF, it calls _wakeup_waiter. The method sets the stored Future's result, which resumes the await inside read or readline.

# CPython: Lib/asyncio/streams.py:531 StreamReader._wakeup_waiter
def _wakeup_waiter(self):
waiter = self._waiter
if waiter is not None:
self._waiter = None
if not waiter.done():
waiter.set_result(None)

StreamWriter.drain applies back-pressure

drain() yields control back to the event loop until the transport's write buffer drops below the low-water mark. The protocol's pause_writing / resume_writing callbacks toggle an internal Event that drain waits on.

# CPython: Lib/asyncio/streams.py:447 StreamWriter.drain
async def drain(self):
if self._transport.is_closing():
await asyncio.sleep(0)
exc = self._protocol._get_close_waiter(self._writer)
...
await self._protocol._drain_helper()

gopy notes

  • StreamReader._buffer is a bytearray, allowing efficient prefix deletion with del buf[:n].
  • The _waiter pattern (a single Future stored per reader) means only one coroutine may wait on a given StreamReader at a time. A second concurrent read raises RuntimeError.
  • FlowControl._drain_helper uses a plain asyncio.Event, not a Future, so multiple writers can all await drain() simultaneously.
  • open_connection uses loop.create_connection; the Unix variant uses loop.create_unix_connection. Both paths return the same (reader, writer) shape.

CPython 3.14 changes

  • The deprecated loop parameter was removed from open_connection, start_server, and the Stream* constructors (deprecation began in 3.8, enforced in 3.10, parameter dropped in 3.14).
  • StreamWriter gained __aenter__ / __aexit__ for use as an async context manager, so callers can rely on close() being awaited automatically.
  • Internal use of asyncio.coroutine (removed in 3.11) is gone; all paths use async def.