Skip to main content

Lib/asyncio/streams.py

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/streams.py

streams.py is the user-facing layer of asyncio networking. It bridges the low-level Transport/Protocol pair (which are callback-based) to coroutine-friendly StreamReader and StreamWriter objects that callers can await directly.

The key entry points are open_connection() and start_server(). open_connection() calls loop.create_connection() with a StreamReaderProtocol factory and returns an (reader, writer) pair once the TCP handshake completes. start_server() wraps loop.create_server() and calls a user-supplied callback with a new (reader, writer) pair for every accepted connection.

StreamReaderProtocol is the glue layer. It implements the asyncio Protocol interface (connection_made, data_received, eof_received, connection_lost) and forwards each event into the StreamReader buffer or onto the StreamWriter close machinery.

In CPython 3.14, open_connection() and start_server() accept an ssl_handshake_timeout parameter. The deprecated loop parameter was removed across the streams API (it was deprecated since 3.8).

Map

LinesSymbolRolegopy
1-60Module prologue, _DEFAULT_LIMITSets the default read buffer limit (64 KiB); imported constants are used throughout the file.
61-120open_connection, start_serverPublic entry points; both delegate to the event loop and construct StreamReaderProtocol instances.
121-280StreamReaderProtocolProtocol implementation; owns references to reader and writer; forwards transport events to them.
281-430StreamWriterWraps the transport; write, writelines, write_eof, can_write_eof, close, wait_closed, drain.
431-700StreamReaderBuffered reader; feed_data, feed_eof, read, readline, readexactly, readuntil, _wait_for_data.

Reading

StreamReader buffer and _wait_for_data (lines 431 to 700)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/streams.py#L431-700

class StreamReader:
_source_traceback = None

def __init__(self, limit=_DEFAULT_LIMIT, *, loop=None):
# The line length limit is a security feature;
# it also determines the buffer size for readline().
if limit <= 0:
raise ValueError('Limit cannot be <= 0')
self._limit = limit
self._buffer = bytearray()
self._eof = False # Whether we have received EOF from the transport.
self._waiter = None # A future used by _wait_for_data()
self._exception = None
self._transport = None
self._paused = False

def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called."""
if self._waiter is not None:
raise RuntimeError(
f'{func_name}() called while another coroutine is '
f'already waiting for incoming data')
if self._eof:
return
self._waiter = self._loop.create_future()
try:
yield from self._waiter
finally:
self._waiter = None

StreamReader accumulates incoming bytes in _buffer (a bytearray). feed_data appends to _buffer and then, if _waiter is set, resolves it so any suspended coroutine resumes. feed_eof sets _eof = True and resolves _waiter in the same way.

_wait_for_data is an internal coroutine used by all the public read* methods. It creates a Future stored in _waiter and yields from it. Because only one consumer can wait at a time, a second concurrent call raises RuntimeError immediately rather than silently queuing.

When _paused is True, the transport's read side has been paused by calling transport.pause_reading(); it is resumed when the buffer drains below _limit. This back-pressure mechanism prevents unbounded buffer growth when the producer is faster than the consumer.

StreamReader.readuntil (lines 431 to 700)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/streams.py#L431-700

async def readuntil(self, separator=b'\n'):
seplen = len(separator)
if seplen == 0:
raise ValueError('Separator should be at least one-byte string')

if self._exception is not None:
raise self._exception

# Consume whole buffer except last (seplen - 1) bytes
chunk = None
isep = 0

while True:
buflen = len(self._buffer)

if buflen >= seplen:
isep = self._buffer.find(separator)

if isep != -1:
break

# Limit check
if buflen > self._limit:
self._buffer.clear()
raise LimitOverrunError(
'Separator is not found, and chunk '
f'exceed the limit', buflen)
if self._eof:
chunk = bytes(self._buffer)
self._buffer.clear()
raise IncompleteReadError(chunk, None)

await self._wait_for_data('readuntil')

if isep > self._limit:
self._buffer.clear()
raise LimitOverrunError(
'Separator is found, but chunk is longer than limit', isep)

chunk = self._buffer[:isep + seplen]
del self._buffer[:isep + seplen]
return bytes(chunk)

readuntil searches _buffer for separator each time new data arrives. Because bytearray.find scans from the start on every call, the function avoids redundant scanning by tracking whether the buffer grew since the last check (the last seplen - 1 bytes from the previous scan may now complete the separator). The _limit guard prevents memory exhaustion: once the unseparated data exceeds the limit a LimitOverrunError is raised and the buffer is cleared so the connection is not left in an ambiguous state.

StreamWriter.drain (lines 281 to 430)

cpython 3.14 @ ab2d84fe1023/Lib/asyncio/streams.py#L281-430

class StreamWriter:
def write(self, data):
self._transport.write(data)

async def drain(self):
"""Wait until it is appropriate to resume writing to the transport."""
if self._reader is not None:
exc = self._reader.exception()
if exc is not None:
raise exc
if self._transport.is_closing():
# Wait for protocol.connection_lost() to be called.
await asyncio.sleep(0)
await self._protocol._drain_helper()

write is synchronous: it passes bytes directly to the transport's write buffer without any await. drain is the flow-control point. It checks for a reader-side exception first (so that a half-closed connection is detected early), then delegates to _drain_helper on the protocol. _drain_helper returns immediately when the transport is not paused, or suspends on an internal Future that resume_writing() resolves once the transport's buffer drops below the high-water mark.

Calling await writer.drain() after every writer.write() is the idiomatic pattern for back-pressure-aware streaming: it yields control back to the event loop and lets other tasks run while the OS flushes the socket buffer.

gopy mirror

module/asyncio/ is pending. StreamReader's buffer and Future-based waiting logic are pure Python and can be ported directly to Go structs. StreamReaderProtocol maps to a Go struct that implements gopy's Protocol interface. open_connection and start_server require the event loop's create_connection and create_server facilities, which depend on the selector and socket ports landing first.

CPython 3.14 changes

  • The loop positional parameter removed from open_connection, start_server, StreamReader, and StreamReaderProtocol (deprecated since 3.8, emitted DeprecationWarning in 3.10, hard-removed in 3.14).
  • open_connection and start_server both accept ssl_handshake_timeout and ssl_shutdown_timeout keyword arguments, forwarded to loop.create_connection / loop.create_server.
  • StreamWriter.start_tls() was added in 3.11 and is stable in 3.14; it upgrades a plain-text connection to TLS in place.