Skip to main content

Lib/_pyio.py (part 2)

Source:

cpython 3.14 @ ab2d84fe1023/Lib/_pyio.py

This annotation covers the buffered and text I/O layers. See lib_io_detail for RawIOBase, FileIO, and the class hierarchy.

Map

LinesSymbolRole
1-100BufferedReader.readFill internal buffer, return requested bytes
101-250BufferedWriter.writeAccumulate in buffer; flush when full
251-400BufferedRandomRead-write buffered I/O; position tracking
401-600TextIOWrapper.__init__Wrap a BufferedIOBase; set encoder/decoder
601-800TextIOWrapper.readDecode bytes to str, handle BOM
801-1000TextIOWrapper.readlineRead until newline; handle universal newlines
1001-1200TextIOWrapper.writeEncode str to bytes; handle line buffering
1201-2000Newline translation\r\n\n, \r\n on input; \nos.linesep on output

Reading

BufferedReader.read

# CPython: Lib/_pyio.py:680 BufferedReader.read
def read(self, size=-1):
"""Read and return up to size bytes, filling from raw."""
with self._read_lock:
if size == -1:
# Read until EOF
chunks = [self._read_buf[self._read_pos:]]
self._reset_read_buf()
while True:
chunk = self.raw.read(DEFAULT_BUFFER_SIZE)
if not chunk: break
chunks.append(chunk)
return b''.join(chunks)
# Read exactly size bytes
avail = len(self._read_buf) - self._read_pos
if avail < size:
needed = size - avail
chunk = self.raw.read(max(needed, self._buffer_size))
self._read_buf = self._read_buf[self._read_pos:] + chunk
self._read_pos = 0
result = self._read_buf[self._read_pos:self._read_pos + size]
self._read_pos += len(result)
return result

BufferedWriter.write

# CPython: Lib/_pyio.py:880 BufferedWriter.write
def write(self, b):
with self._write_lock:
if self._write_buf_size + len(b) > self._buffer_size:
self._flush_unlocked()
self._write_buf += b
if self._write_buf_size > self._buffer_size:
self._flush_unlocked()
return len(b)

TextIOWrapper.readline

# CPython: Lib/_pyio.py:1230 TextIOWrapper.readline
def readline(self, size=-1):
# Universal newlines: translate \r\n and \r to \n on input
while True:
readnl = self._readnl
if self._decoded_chars:
# Check for a newline in the decoded buffer
nl = self._decoded_chars.find(readnl, self._decoded_chars_used)
if nl >= 0:
result = self._decoded_chars[self._decoded_chars_used:nl+1]
self._decoded_chars_used = nl + 1
return result
# Need more data from the underlying buffer
if not self._readin_chunk_or_EOF():
return self._decoded_chars[self._decoded_chars_used:]

Newline translation

# CPython: Lib/_pyio.py:1380 newline parameter
# newline=None (default): universal newlines on read (\r\n and \r → \n)
# \n → os.linesep on write (Windows: \r\n)
# newline='' : pass through unchanged, but split on \r\n, \r, \n
# newline='\n' : no translation
# newline='\r\n' : translate \n → \r\n on write only

gopy notes

_pyio is the reference Python implementation; the C accelerator _io is normally used. gopy uses Go's native bufio.Reader/bufio.Writer in objects/buffered_io.go. TextIOWrapper.readline with universal newlines is in objects/textiowrapper.go:TextIOWrapperReadLine.