Lib/asyncio/streams.py
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/streams.py
streams.py is the user-facing layer of asyncio networking. It bridges
the low-level Transport/Protocol pair (which are callback-based) to
coroutine-friendly StreamReader and StreamWriter objects that callers
can await directly.
The key entry points are open_connection() and start_server().
open_connection() calls loop.create_connection() with a
StreamReaderProtocol factory and returns an (reader, writer) pair once
the TCP handshake completes. start_server() wraps loop.create_server()
and calls a user-supplied callback with a new (reader, writer) pair for
every accepted connection.
StreamReaderProtocol is the glue layer. It implements the asyncio
Protocol interface (connection_made, data_received, eof_received,
connection_lost) and forwards each event into the StreamReader buffer
or onto the StreamWriter close machinery.
In CPython 3.14, open_connection() and start_server() accept an
ssl_handshake_timeout parameter. The deprecated loop parameter was
removed across the streams API (it was deprecated since 3.8).
Map
| Lines | Symbol | Role | gopy |
|---|---|---|---|
| 1-60 | Module prologue, _DEFAULT_LIMIT | Sets the default read buffer limit (64 KiB); imported constants are used throughout the file. | |
| 61-120 | open_connection, start_server | Public entry points; both delegate to the event loop and construct StreamReaderProtocol instances. | |
| 121-280 | StreamReaderProtocol | Protocol implementation; owns references to reader and writer; forwards transport events to them. | |
| 281-430 | StreamWriter | Wraps the transport; write, writelines, write_eof, can_write_eof, close, wait_closed, drain. | |
| 431-700 | StreamReader | Buffered reader; feed_data, feed_eof, read, readline, readexactly, readuntil, _wait_for_data. |
Reading
StreamReader buffer and _wait_for_data (lines 431 to 700)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/streams.py#L431-700
class StreamReader:
_source_traceback = None
def __init__(self, limit=_DEFAULT_LIMIT, *, loop=None):
# The line length limit is a security feature;
# it also determines the buffer size for readline().
if limit <= 0:
raise ValueError('Limit cannot be <= 0')
self._limit = limit
self._buffer = bytearray()
self._eof = False # Whether we have received EOF from the transport.
self._waiter = None # A future used by _wait_for_data()
self._exception = None
self._transport = None
self._paused = False
def _wait_for_data(self, func_name):
"""Wait until feed_data() or feed_eof() is called."""
if self._waiter is not None:
raise RuntimeError(
f'{func_name}() called while another coroutine is '
f'already waiting for incoming data')
if self._eof:
return
self._waiter = self._loop.create_future()
try:
yield from self._waiter
finally:
self._waiter = None
StreamReader accumulates incoming bytes in _buffer (a bytearray).
feed_data appends to _buffer and then, if _waiter is set, resolves
it so any suspended coroutine resumes. feed_eof sets _eof = True and
resolves _waiter in the same way.
_wait_for_data is an internal coroutine used by all the public read*
methods. It creates a Future stored in _waiter and yields from it.
Because only one consumer can wait at a time, a second concurrent call
raises RuntimeError immediately rather than silently queuing.
When _paused is True, the transport's read side has been paused by
calling transport.pause_reading(); it is resumed when the buffer drains
below _limit. This back-pressure mechanism prevents unbounded buffer
growth when the producer is faster than the consumer.
StreamReader.readuntil (lines 431 to 700)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/streams.py#L431-700
async def readuntil(self, separator=b'\n'):
seplen = len(separator)
if seplen == 0:
raise ValueError('Separator should be at least one-byte string')
if self._exception is not None:
raise self._exception
# Consume whole buffer except last (seplen - 1) bytes
chunk = None
isep = 0
while True:
buflen = len(self._buffer)
if buflen >= seplen:
isep = self._buffer.find(separator)
if isep != -1:
break
# Limit check
if buflen > self._limit:
self._buffer.clear()
raise LimitOverrunError(
'Separator is not found, and chunk '
f'exceed the limit', buflen)
if self._eof:
chunk = bytes(self._buffer)
self._buffer.clear()
raise IncompleteReadError(chunk, None)
await self._wait_for_data('readuntil')
if isep > self._limit:
self._buffer.clear()
raise LimitOverrunError(
'Separator is found, but chunk is longer than limit', isep)
chunk = self._buffer[:isep + seplen]
del self._buffer[:isep + seplen]
return bytes(chunk)
readuntil searches _buffer for separator each time new data arrives.
Because bytearray.find scans from the start on every call, the function
avoids redundant scanning by tracking whether the buffer grew since the
last check (the last seplen - 1 bytes from the previous scan may now
complete the separator). The _limit guard prevents memory exhaustion:
once the unseparated data exceeds the limit a LimitOverrunError is
raised and the buffer is cleared so the connection is not left in an
ambiguous state.
StreamWriter.drain (lines 281 to 430)
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/streams.py#L281-430
class StreamWriter:
def write(self, data):
self._transport.write(data)
async def drain(self):
"""Wait until it is appropriate to resume writing to the transport."""
if self._reader is not None:
exc = self._reader.exception()
if exc is not None:
raise exc
if self._transport.is_closing():
# Wait for protocol.connection_lost() to be called.
await asyncio.sleep(0)
await self._protocol._drain_helper()
write is synchronous: it passes bytes directly to the transport's write
buffer without any await. drain is the flow-control point. It checks
for a reader-side exception first (so that a half-closed connection is
detected early), then delegates to _drain_helper on the protocol.
_drain_helper returns immediately when the transport is not paused, or
suspends on an internal Future that resume_writing() resolves once the
transport's buffer drops below the high-water mark.
Calling await writer.drain() after every writer.write() is the
idiomatic pattern for back-pressure-aware streaming: it yields control back
to the event loop and lets other tasks run while the OS flushes the socket
buffer.
gopy mirror
module/asyncio/ is pending. StreamReader's buffer and Future-based
waiting logic are pure Python and can be ported directly to Go structs.
StreamReaderProtocol maps to a Go struct that implements gopy's
Protocol interface. open_connection and start_server require the
event loop's create_connection and create_server facilities, which
depend on the selector and socket ports landing first.
CPython 3.14 changes
- The
looppositional parameter removed fromopen_connection,start_server,StreamReader, andStreamReaderProtocol(deprecated since 3.8, emittedDeprecationWarningin 3.10, hard-removed in 3.14). open_connectionandstart_serverboth acceptssl_handshake_timeoutandssl_shutdown_timeoutkeyword arguments, forwarded toloop.create_connection/loop.create_server.StreamWriter.start_tls()was added in 3.11 and is stable in 3.14; it upgrades a plain-text connection to TLS in place.