Skip to main content

Modules/_interpqueuesmodule.c

cpython 3.14 @ ab2d84fe1023/Modules/_interpqueuesmodule.c

Implements the _interpqueues C extension, which backs the public interpreters.queues API introduced by PEP 734. Each queue is identified by an opaque integer qid and can transfer arbitrary Python objects between sub-interpreters without sharing the GIL.

Objects crossing the queue boundary are serialized using _PyCrossInterpreterData: a small struct that holds a pointer to interpreter- neutral bytes and a destructor. On put, the object is encoded into that struct; on get, the struct is decoded back into a live Python object in the receiving interpreter. This design avoids any cross-interpreter reference counting.

Queue storage is a linked list (or ring buffer in newer revisions) protected by a per-queue mutex. The module keeps a global registry mapping qid to queue state, also protected by a registry-level lock.

Public surface (all callable from Python via the module methods table):

Python nameC nameDescription
queues.create(maxsize)interp_createAllocate a new queue; return its qid.
queues.destroy(qid)interp_destroyDeallocate queue and drain remaining items.
queues.put(qid, obj, fmt)interp_putSerialize obj and append to queue tail.
queues.get(qid)interp_getPop and deserialize the head item.
queues.get_count(qid)interp_get_countReturn current item count without blocking.

Map

LinesSymbolRolegopy
1-60(includes, _queueitem, _queue, _queues)Data structures: item node, queue head, global registry-
61-140_queue_new, _queue_freeAllocate / release a single queue object-
141-220_queue_put, _queue_getLow-level enqueue / dequeue under per-queue mutex-
221-300_queues_init, _queues_finiInitialize and tear down the global registry-
301-360_queues_add, _queues_remove, _queues_lookupRegistry CRUD-
361-420interp_create, interp_destroyPython-facing create / destroy-
421-500interp_put, interp_getPython-facing put / get; serialize via _PyCrossInterpreterData-
501-540interp_get_countPython-facing get_count-
541-600module_exec, PyInit__interpqueuesModule initialization-

Reading

Cross-interpreter serialization with _PyCrossInterpreterData

Every item that travels through the queue is wrapped in a _PyCrossInterpreterData struct before being stored. interp_put calls _PyObject_GetCrossInterpreterData, which asks the object's type to fill the struct:

/* Modules/_interpqueuesmodule.c:421 interp_put */
static PyObject *
interp_put(PyObject *self, PyObject *args)
{
PyObject *id_obj, *obj;
int fmt = 0;
if (!PyArg_ParseTuple(args, "OO|i", &id_obj, &obj, &fmt)) return NULL;

int64_t qid = _interpqueues_resolve_id(id_obj);
if (qid < 0) return NULL;

_PyCrossInterpreterData *data = PyMem_Malloc(sizeof(_PyCrossInterpreterData));
if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
PyMem_Free(data);
return NULL; /* type does not support cross-interpreter transfer */
}
int rc = _queue_put(_get_queues(self), qid, data);
if (rc != 0) {
_PyCrossInterpreterData_Release(data);
PyMem_Free(data);
return NULL;
}
Py_RETURN_NONE;
}

On get, the receiving interpreter calls _PyCrossInterpreterData_NewObject to reconstruct the Python value:

/* Modules/_interpqueuesmodule.c:463 interp_get */
static PyObject *
interp_get(PyObject *self, PyObject *args)
{
PyObject *id_obj;
if (!PyArg_ParseTuple(args, "O", &id_obj)) return NULL;
int64_t qid = _interpqueues_resolve_id(id_obj);
if (qid < 0) return NULL;

_PyCrossInterpreterData *data = NULL;
int rc = _queue_get(_get_queues(self), qid, &data);
if (rc != 0 || data == NULL) {
/* rc == ERR_EMPTY maps to QueueEmpty exception */
return _queues_map_error(rc);
}
PyObject *obj = _PyCrossInterpreterData_NewObject(data);
_PyCrossInterpreterData_Release(data);
PyMem_Free(data);
return obj; /* may be NULL if reconstruction failed */
}

Per-queue mutex and maxsize enforcement

_queue_put acquires the queue mutex before touching the linked list. If maxsize > 0 and the list is already full, the call fails immediately (no blocking; callers are expected to poll or use time.sleep):

/* Modules/_interpqueuesmodule.c:141 _queue_put */
static int
_queue_put(_queue *q, _PyCrossInterpreterData *data)
{
PyThread_acquire_lock(q->mutex, WAIT_LOCK);
int rc = 0;
if (q->maxsize > 0 && q->count >= q->maxsize) {
rc = ERR_QUEUE_FULL;
goto finally;
}
_queueitem *item = PyMem_Malloc(sizeof(_queueitem));
item->data = data;
item->next = NULL;
if (q->tail == NULL) {
q->head = q->tail = item;
} else {
q->tail->next = item;
q->tail = item;
}
q->count++;
finally:
PyThread_release_lock(q->mutex);
return rc;
}

gopy mirror

_interpqueues has not been ported to gopy. The feature depends on sub-interpreter support and cross-interpreter object serialization, neither of which is implemented yet. When gopy gains sub-interpreter infrastructure the port would live at module/interpqueues/.

CPython 3.14 changes

  • PEP 734 landed in 3.13 as a provisional API; 3.14 promoted it to stable and removed the _ prefix from the public interpreters.queues module name.
  • The internal storage was changed from a singly-linked list to a ring buffer for bounded queues to improve cache locality on put/get hot paths.
  • _PyCrossInterpreterData was reorganized into a two-field struct (data + free) with explicit versioning to make future ABI evolution easier.