Modules/_interpqueuesmodule.c
cpython 3.14 @ ab2d84fe1023/Modules/_interpqueuesmodule.c
Implements the _interpqueues C extension, which backs the public
interpreters.queues API introduced by PEP 734. Each queue is identified by
an opaque integer qid and can transfer arbitrary Python objects between
sub-interpreters without sharing the GIL.
Objects crossing the queue boundary are serialized using
_PyCrossInterpreterData: a small struct that holds a pointer to interpreter-
neutral bytes and a destructor. On put, the object is encoded into that
struct; on get, the struct is decoded back into a live Python object in the
receiving interpreter. This design avoids any cross-interpreter reference
counting.
Queue storage is a linked list (or ring buffer in newer revisions) protected by
a per-queue mutex. The module keeps a global registry mapping qid to queue
state, also protected by a registry-level lock.
Public surface (all callable from Python via the module methods table):
| Python name | C name | Description |
|---|---|---|
queues.create(maxsize) | interp_create | Allocate a new queue; return its qid. |
queues.destroy(qid) | interp_destroy | Deallocate queue and drain remaining items. |
queues.put(qid, obj, fmt) | interp_put | Serialize obj and append to queue tail. |
queues.get(qid) | interp_get | Pop and deserialize the head item. |
queues.get_count(qid) | interp_get_count | Return current item count without blocking. |
Map
| Lines | Symbol | Role | gopy |
|---|---|---|---|
| 1-60 | (includes, _queueitem, _queue, _queues) | Data structures: item node, queue head, global registry | - |
| 61-140 | _queue_new, _queue_free | Allocate / release a single queue object | - |
| 141-220 | _queue_put, _queue_get | Low-level enqueue / dequeue under per-queue mutex | - |
| 221-300 | _queues_init, _queues_fini | Initialize and tear down the global registry | - |
| 301-360 | _queues_add, _queues_remove, _queues_lookup | Registry CRUD | - |
| 361-420 | interp_create, interp_destroy | Python-facing create / destroy | - |
| 421-500 | interp_put, interp_get | Python-facing put / get; serialize via _PyCrossInterpreterData | - |
| 501-540 | interp_get_count | Python-facing get_count | - |
| 541-600 | module_exec, PyInit__interpqueues | Module initialization | - |
Reading
Cross-interpreter serialization with _PyCrossInterpreterData
Every item that travels through the queue is wrapped in a
_PyCrossInterpreterData struct before being stored. interp_put calls
_PyObject_GetCrossInterpreterData, which asks the object's type to fill the
struct:
/* Modules/_interpqueuesmodule.c:421 interp_put */
static PyObject *
interp_put(PyObject *self, PyObject *args)
{
PyObject *id_obj, *obj;
int fmt = 0;
if (!PyArg_ParseTuple(args, "OO|i", &id_obj, &obj, &fmt)) return NULL;
int64_t qid = _interpqueues_resolve_id(id_obj);
if (qid < 0) return NULL;
_PyCrossInterpreterData *data = PyMem_Malloc(sizeof(_PyCrossInterpreterData));
if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
PyMem_Free(data);
return NULL; /* type does not support cross-interpreter transfer */
}
int rc = _queue_put(_get_queues(self), qid, data);
if (rc != 0) {
_PyCrossInterpreterData_Release(data);
PyMem_Free(data);
return NULL;
}
Py_RETURN_NONE;
}
On get, the receiving interpreter calls _PyCrossInterpreterData_NewObject
to reconstruct the Python value:
/* Modules/_interpqueuesmodule.c:463 interp_get */
static PyObject *
interp_get(PyObject *self, PyObject *args)
{
PyObject *id_obj;
if (!PyArg_ParseTuple(args, "O", &id_obj)) return NULL;
int64_t qid = _interpqueues_resolve_id(id_obj);
if (qid < 0) return NULL;
_PyCrossInterpreterData *data = NULL;
int rc = _queue_get(_get_queues(self), qid, &data);
if (rc != 0 || data == NULL) {
/* rc == ERR_EMPTY maps to QueueEmpty exception */
return _queues_map_error(rc);
}
PyObject *obj = _PyCrossInterpreterData_NewObject(data);
_PyCrossInterpreterData_Release(data);
PyMem_Free(data);
return obj; /* may be NULL if reconstruction failed */
}
Per-queue mutex and maxsize enforcement
_queue_put acquires the queue mutex before touching the linked list. If
maxsize > 0 and the list is already full, the call fails immediately (no
blocking; callers are expected to poll or use time.sleep):
/* Modules/_interpqueuesmodule.c:141 _queue_put */
static int
_queue_put(_queue *q, _PyCrossInterpreterData *data)
{
PyThread_acquire_lock(q->mutex, WAIT_LOCK);
int rc = 0;
if (q->maxsize > 0 && q->count >= q->maxsize) {
rc = ERR_QUEUE_FULL;
goto finally;
}
_queueitem *item = PyMem_Malloc(sizeof(_queueitem));
item->data = data;
item->next = NULL;
if (q->tail == NULL) {
q->head = q->tail = item;
} else {
q->tail->next = item;
q->tail = item;
}
q->count++;
finally:
PyThread_release_lock(q->mutex);
return rc;
}
gopy mirror
_interpqueues has not been ported to gopy. The feature depends on
sub-interpreter support and cross-interpreter object serialization, neither of
which is implemented yet. When gopy gains sub-interpreter infrastructure the
port would live at module/interpqueues/.
CPython 3.14 changes
- PEP 734 landed in 3.13 as a provisional API; 3.14 promoted it to stable and
removed the
_prefix from the publicinterpreters.queuesmodule name. - The internal storage was changed from a singly-linked list to a ring buffer
for bounded queues to improve cache locality on
put/gethot paths. _PyCrossInterpreterDatawas reorganized into a two-field struct (data+free) with explicit versioning to make future ABI evolution easier.