Lib/asyncio/subprocess.py
cpython 3.14 @ ab2d84fe1023/Lib/asyncio/subprocess.py
This file is the thin public layer sitting on top of the transport/protocol stack. It provides two coroutine entry points (create_subprocess_exec and create_subprocess_shell), a protocol implementation that wires up pipes as StreamReader/StreamWriter objects, and a Process wrapper that coordinates communicate(), wait(), and signal delivery.
Map
| Lines | Symbol | Role |
|---|---|---|
| 17-116 | SubprocessStreamProtocol | SubprocessProtocol subclass; allocates StreamReader for stdout/stderr and StreamWriter for stdin on connection_made, feeds data to readers in pipe_data_received, and defers transport close until all pipes and the process have exited |
| 118-203 | Process | Public handle returned to callers; exposes pid, returncode, stdin, stdout, stderr; implements communicate() via tasks.gather over _feed_stdin, _read_stream(1), _read_stream(2), plus wait(), send_signal(), terminate(), kill() |
| 148-166 | Process._feed_stdin | Writes input bytes to stdin, drains, swallows BrokenPipeError/ConnectionResetError, then closes the write end |
| 171-186 | Process._read_stream | Reads one complete stream to EOF using stream.read(), then closes the pipe transport |
| 188-203 | Process.communicate | Launches all three coroutines concurrently with tasks.gather, awaits wait(), returns (stdout, stderr) |
| 206-215 | create_subprocess_shell | Coroutine entry point; calls loop.subprocess_shell with a fresh SubprocessStreamProtocol factory |
| 218-229 | create_subprocess_exec | Coroutine entry point; calls loop.subprocess_exec; otherwise identical structure to create_subprocess_shell |
Reading
SubprocessStreamProtocol.connection_made
The protocol wires up all three pipe transports the moment the underlying transport calls connection_made. Stdout and stderr become StreamReader instances; stdin becomes a StreamWriter pointing at fd 0.
# CPython: Lib/asyncio/subprocess.py:40 SubprocessStreamProtocol.connection_made
def connection_made(self, transport):
self._transport = transport
stdout_transport = transport.get_pipe_transport(1)
if stdout_transport is not None:
self.stdout = streams.StreamReader(limit=self._limit,
loop=self._loop)
self.stdout.set_transport(stdout_transport)
self._pipe_fds.append(1)
stderr_transport = transport.get_pipe_transport(2)
if stderr_transport is not None:
self.stderr = streams.StreamReader(limit=self._limit,
loop=self._loop)
self.stderr.set_transport(stderr_transport)
self._pipe_fds.append(2)
stdin_transport = transport.get_pipe_transport(0)
if stdin_transport is not None:
self.stdin = streams.StreamWriter(stdin_transport,
protocol=self,
reader=None,
loop=self._loop)
SubprocessStreamProtocol._maybe_close_transport
The transport is only closed when both conditions are met: all tracked pipe fds have reported pipe_connection_lost, and process_exited has fired. This prevents races where the process exits before its output pipes are drained.
# CPython: Lib/asyncio/subprocess.py:108 SubprocessStreamProtocol._maybe_close_transport
def _maybe_close_transport(self):
if len(self._pipe_fds) == 0 and self._process_exited:
self._transport.close()
self._transport = None
Process.communicate
communicate fans out three concurrent coroutines with a single tasks.gather call, then calls wait() to collect the exit code. Passing None for stdin or missing pipes substitutes a _noop() coroutine so the gather always receives three awaitables.
# CPython: Lib/asyncio/subprocess.py:188 Process.communicate
async def communicate(self, input=None):
if self.stdin is not None:
stdin = self._feed_stdin(input)
else:
stdin = self._noop()
if self.stdout is not None:
stdout = self._read_stream(1)
else:
stdout = self._noop()
if self.stderr is not None:
stderr = self._read_stream(2)
else:
stderr = self._noop()
stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
await self.wait()
return (stdout, stderr)
create_subprocess_exec entry point
Both entry points follow the same pattern: obtain the running loop, build a SubprocessStreamProtocol factory via a lambda that captures limit and loop, delegate to the loop's subprocess method, then wrap the result in a Process.
# CPython: Lib/asyncio/subprocess.py:218 create_subprocess_exec
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
stderr=None, limit=streams._DEFAULT_LIMIT,
**kwds):
loop = events.get_running_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_exec(
protocol_factory,
program, *args,
stdin=stdin, stdout=stdout,
stderr=stderr, **kwds)
return Process(transport, protocol, loop)
gopy notes
This file has no platform-specific code and no C extension calls. A Go port would implement SubprocessStreamProtocol as a struct satisfying the SubprocessProtocol interface, use goroutines instead of tasks.gather for concurrent pipe draining in communicate, and expose Process as a plain struct. The returncode property maps cleanly to an atomic integer updated by the child-watcher callback.