SPSC Queue (Zero-Copy Claim/Publish)¶
Overview¶
Bounded Single-Producer Single-Consumer queue with pointer-based slot access. Unlike the generic ring buffer (which copies data via memcpy), this queue provides a claim → write → publish pattern that eliminates copy overhead entirely. The consumer side uses a peek → read → consume pattern.
Status: Implemented
Full implementation with cached indices for reduced cross-core traffic, zero-copy slot access, and convenience memcpy wrappers.
Memory Model¶
┌───────────────────────────────────────────────────────────────────┐
│ Claim-Publish Protocol: │
│ │
│ PRODUCER: CONSUMER: │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ 1. claim_slot() │ │ 1. peek() │ │
│ │ → get ptr to slot│ │ → get ptr to slot│ │
│ │ 2. write directly │ │ 2. read directly │ │
│ │ into slot memory │ │ from slot memory │ │
│ │ 3. publish() │ │ 3. consume() │ │
│ │ → advance index │ │ → advance index │ │
│ └─────────────────────┘ └─────────────────────┘ │
│ │
│ Zero-copy: no memcpy in hot path write/read directly to slot │
└───────────────────────────────────────────────────────────────────┘
Cached Index Optimization¶
┌───────────── Cache Line 0 (Producer) ─────────────┐
│ _Atomic uint64_t write_idx; │
│ uint64_t cached_read_idx; ← local copy of tail │
│ padding[48 bytes] │
└─────────────────────────────────────────────────────┘
┌───────────── Cache Line 1 (Consumer) ─────────────┐
│ _Atomic uint64_t read_idx; │
│ uint64_t cached_write_idx; ← local copy of head │
│ padding[48 bytes] │
└─────────────────────────────────────────────────────┘
Fast path: check against local cached copy (no cross-core traffic)
Slow path: refresh cache from remote atomic (only when appears full/empty)
sequenceDiagram
participant P as Producer
participant Q as Queue
participant C as Consumer
P->>Q: claim() → slot_ptr
Note over P: Write data directly into slot
P->>Q: publish() [store_release write_idx+1]
C->>Q: peek() → slot_ptr
Note over C: Read data directly from slot
C->>Q: consume() [store_release read_idx+1] API Reference¶
Data Structure¶
typedef struct {
/* Producer cache line */
alignas(SPSC_CACHE_LINE) _Atomic uint64_t write_idx;
uint64_t cached_read_idx;
uint8_t _pad0[SPSC_CACHE_LINE - sizeof(_Atomic uint64_t) - sizeof(uint64_t)];
/* Consumer cache line */
alignas(SPSC_CACHE_LINE) _Atomic uint64_t read_idx;
uint64_t cached_write_idx;
uint8_t _pad1[SPSC_CACHE_LINE - sizeof(_Atomic uint64_t) - sizeof(uint64_t)];
/* Immutable metadata */
uint64_t capacity;
uint64_t mask;
size_t slot_size;
uint8_t *slots;
} spsc_queue_t;
Initialization¶
spsc_queue_init¶
| Parameter | Type | Description |
|---|---|---|
buffer | void * | Pre-allocated slot memory (capacity × slot_size bytes) |
capacity | uint64_t | Number of slots (must be power of 2) |
slot_size | size_t | Size of each element |
Zero-Copy Producer API¶
spsc_queue_claim¶
Get a writable pointer to the next available slot. Returns NULL if the queue is full.
Does NOT advance the write index. You must call spsc_queue_publish() after writing.
Fast path: checks against cached_read_idx (no atomic load from consumer's cache line). Slow path: refreshes cached_read_idx from the consumer's read_idx only when the fast path indicates the queue may be full.
spsc_queue_publish¶
Advance the write index with memory_order_release, making the written data visible to the consumer.
Zero-Copy Consumer API¶
spsc_queue_peek¶
Get a read-only pointer to the next available slot. Returns NULL if the queue is empty.
Does NOT advance the read index. You must call spsc_queue_consume() after reading.
spsc_queue_consume¶
Advance the read index with memory_order_release, freeing the slot for the producer.
Convenience API (with memcpy)¶
bool spsc_queue_push(spsc_queue_t *q, const void *data); /* claim + memcpy + publish */
bool spsc_queue_pop(spsc_queue_t *q, void *data); /* peek + memcpy + consume */
Query Operations¶
uint64_t spsc_queue_size(const spsc_queue_t *q);
bool spsc_queue_empty(const spsc_queue_t *q);
bool spsc_queue_full(const spsc_queue_t *q);
Usage Examples¶
Zero-Copy Message Passing¶
#include "spsc_queue.h"
#include <stdio.h>
#include <pthread.h>
typedef struct {
uint64_t timestamp;
uint32_t id;
uint32_t length;
uint8_t data[48]; /* Fixed-size message: 64 bytes total */
} message_t;
#define CAPACITY 1024
static uint8_t slots[CAPACITY * sizeof(message_t)] __attribute__((aligned(64)));
static spsc_queue_t queue;
void *producer(void *arg)
{
(void)arg;
for (uint64_t i = 0; i < 1000000; i++) {
/* Zero-copy: write directly into slot */
message_t *slot = (message_t *)spsc_queue_claim(&queue);
if (!slot) {
__builtin_ia32_pause();
i--; /* Retry */
continue;
}
slot->timestamp = i;
slot->id = (uint32_t)(i & 0xFFFF);
slot->length = 8;
*(uint64_t *)slot->data = i * 42;
spsc_queue_publish(&queue);
}
return NULL;
}
void *consumer(void *arg)
{
(void)arg;
uint64_t count = 0;
while (count < 1000000) {
const message_t *msg = (const message_t *)spsc_queue_peek(&queue);
if (!msg) {
__builtin_ia32_pause();
continue;
}
/* Zero-copy: read directly from slot */
if (msg->id != (uint32_t)(count & 0xFFFF)) {
fprintf(stderr, "Sequence error at %lu\n", count);
break;
}
count++;
spsc_queue_consume(&queue);
}
printf("Consumer processed %lu messages\n", count);
return NULL;
}
int main(void)
{
spsc_queue_init(&queue, slots, CAPACITY, sizeof(message_t));
pthread_t p, c;
pthread_create(&p, NULL, producer, NULL);
pthread_create(&c, NULL, consumer, NULL);
pthread_join(p, NULL);
pthread_join(c, NULL);
return 0;
}
Batch Processing Pattern¶
/* Process multiple messages per wake-up (amortize) */
int process_batch(spsc_queue_t *q, int max_batch)
{
int processed = 0;
while (processed < max_batch) {
const message_t *msg = (const message_t *)spsc_queue_peek(q);
if (!msg) break;
handle_message(msg);
spsc_queue_consume(q);
processed++;
}
return processed;
}
Build & Run¶
# Compile (enable native arch for best atomics)
gcc -Wall -Wextra -Werror -O3 -std=c11 -march=native \
-o spsc_queue_demo spsc_queue.c -lpthread
# Run on isolated cores
taskset -c 2,3 ./spsc_queue_demo
Performance¶
| Platform | Operation | Latency | Throughput |
|---|---|---|---|
| x86-64 (Xeon) | claim+publish+peek+consume | ~4 ns | 250 Mops/s |
| x86-64 (Ryzen) | claim+publish+peek+consume | ~3.5 ns | 285 Mops/s |
| ARM64 (Cortex-A72) | claim+publish+peek+consume | ~12 ns | 83 Mops/s |
Why Faster Than Ring Buffer?¶
| Factor | Ring Buffer | SPSC Queue |
|---|---|---|
| Data copy | 2× memcpy (push + pop) | 0 copies (pointer access) |
| Cache miss on data | Yes (memcpy touches data) | No (producer/consumer access same slot) |
| Cross-core atomic loads | Every push/pop | Only when cached index is stale |
Test Output¶
$ taskset -c 2,3 ./build/spsc_queue_bench
[spsc_queue] Init: capacity=1024, slot_size=64
[spsc_queue] Zero-copy correctness: 1000000 messages verified ✓
[spsc_queue] Cached index efficiency: 99.2% fast-path hits
[spsc_queue] Benchmark (10M ops):
SPSC Queue: 4.1 ns/op (243.9 Mops/s)
[spsc_queue] vs ring_buffer: 1.9x faster (zero-copy advantage)
[PASS] All spsc_queue tests passed