Skip to content

SPSC Ring Buffer

Overview

Lock-free Single-Producer Single-Consumer (SPSC) bounded circular buffer using C11 atomic operations with explicit memory ordering. Provides wait-free push/pop with O(1) amortized throughput and sub-10ns latency.

Status: Implemented

Full implementation with cache-line-aligned indices, power-of-2 masking, and acquire/release memory ordering.

Memory Model

┌─────────────────────────────────────────────────────────────────────┐
│  Cache Layout (prevents false sharing):                              │
│                                                                      │
│  ┌──────────────────────────────── Cache Line 0 (64B) ────────────┐ │
│  │  _Atomic uint64_t head;     /* Producer writes */                │ │
│  │  uint8_t padding[56];       /* Fill to 64B boundary */           │ │
│  └──────────────────────────────────────────────────────────────────┘ │
│  ┌──────────────────────────────── Cache Line 1 (64B) ────────────┐ │
│  │  _Atomic uint64_t tail;     /* Consumer writes */                │ │
│  │  uint8_t padding[56];       /* Fill to 64B boundary */           │ │
│  └──────────────────────────────────────────────────────────────────┘ │
│                                                                      │
│  Buffer: [slot 0][slot 1][slot 2] ... [slot N-1]                     │
│           ↑                           ↑                              │
│          tail & mask                 head & mask                      │
│                                                                      │
│  Index wrapping: index = sequence & (capacity - 1)                   │
│  (Bitwise AND only works with power-of-2 capacity)                 │
└─────────────────────────────────────────────────────────────────────┘

Memory Ordering Diagram

┌─────────────────────────┐         ┌─────────────────────────┐
│     PRODUCER (Core 0)   │         │     CONSUMER (Core 1)   │
├─────────────────────────┤         ├─────────────────────────┤
│                         │         │                         │
│ 1. Load tail (ACQUIRE)  │         │ 1. Load head (ACQUIRE)  │
│    → sees consumer's    │         │    → sees producer's    │
│      latest progress    │         │      latest progress    │
│                         │         │                         │
│ 2. Check: head-tail     │         │ 2. Check: tail < head?  │
│    < capacity?          │         │    (empty check)        │
│    (full check)         │         │                         │
│                         │         │ 3. memcpy from slot     │
│ 3. memcpy into slot     │         │    (read data)          │
│    (write data)         │         │                         │
│                         │         │ 4. Store tail+1         │
│ 4. Store head+1         │         │    (RELEASE)            │
│    (RELEASE)            │         │    → frees slot for     │
│    → makes data visible │         │      producer           │
│      to consumer        │         │                         │
└─────────────────────────┘         └─────────────────────────┘

API Reference

Data Structure

typedef struct {
    /* Producer cache line */
    alignas(CACHE_LINE_SIZE) _Atomic uint64_t head;
    uint8_t _pad_producer[CACHE_LINE_SIZE - sizeof(_Atomic uint64_t)];

    /* Consumer cache line */
    alignas(CACHE_LINE_SIZE) _Atomic uint64_t tail;
    uint8_t _pad_consumer[CACHE_LINE_SIZE - sizeof(_Atomic uint64_t)];

    /* Immutable after init */
    uint64_t capacity;
    uint64_t mask;        /* capacity - 1 */
    size_t   slot_size;   /* bytes per element */
    uint8_t *buffer;      /* pre-allocated slot array */
} ring_buffer_t;

Constants

#define CACHE_LINE_SIZE              64
#define RING_BUFFER_DEFAULT_CAPACITY 4096

Operations

ring_buffer_init

int ring_buffer_init(ring_buffer_t *rb, void *buffer, uint64_t capacity, size_t slot_size);

Initialize the ring buffer with pre-allocated storage.

Parameter Type Description
buffer void * Pre-allocated memory (capacity × slot_size bytes)
capacity uint64_t Number of slots (must be power of 2)
slot_size size_t Size of each element in bytes

Returns: 0 on success, -1 if capacity is not a power of 2.

ring_buffer_push

bool ring_buffer_push(ring_buffer_t *rb, const void *data);

Push an element into the ring (producer side). Copies slot_size bytes from data into the next slot.

Returns: true on success, false if buffer is full.

Memory ordering:

  • Loads tail with memory_order_acquire (sees consumer's progress)
  • Stores head + 1 with memory_order_release (publishes data)

ring_buffer_pop

bool ring_buffer_pop(ring_buffer_t *rb, void *data);

Pop an element from the ring (consumer side). Copies slot_size bytes from the slot into data.

Returns: true on success, false if buffer is empty.

Query Operations

uint64_t ring_buffer_size(const ring_buffer_t *rb);   /* Approximate occupancy */
bool     ring_buffer_empty(const ring_buffer_t *rb);  /* Consumer perspective */
bool     ring_buffer_full(const ring_buffer_t *rb);   /* Producer perspective */

Approximate Size

ring_buffer_size() is inherently racy in a concurrent context it reads both head and tail which may be updated between the two reads. Use it for monitoring, not for synchronization decisions.

Usage Example

Producer-Consumer Pattern

#include "ring_buffer.h"
#include <stdio.h>
#include <pthread.h>

#define CAPACITY 1024
#define SLOT_SIZE sizeof(uint64_t)

static uint8_t storage[CAPACITY * SLOT_SIZE] __attribute__((aligned(64)));
static ring_buffer_t rb;

void *producer(void *arg)
{
    (void)arg;
    for (uint64_t i = 0; i < 1000000; i++) {
        while (!ring_buffer_push(&rb, &i)) {
            /* Spin-wait (or yield) if full */
            __builtin_ia32_pause();
        }
    }
    return NULL;
}

void *consumer(void *arg)
{
    (void)arg;
    uint64_t value, count = 0, sum = 0;
    while (count < 1000000) {
        if (ring_buffer_pop(&rb, &value)) {
            sum += value;
            count++;
        } else {
            __builtin_ia32_pause();
        }
    }
    printf("Consumer done: %lu items, sum=%lu\n", count, sum);
    return NULL;
}

int main(void)
{
    ring_buffer_init(&rb, storage, CAPACITY, SLOT_SIZE);

    pthread_t prod, cons;
    pthread_create(&prod, NULL, producer, NULL);
    pthread_create(&cons, NULL, consumer, NULL);

    pthread_join(prod, NULL);
    pthread_join(cons, NULL);

    return 0;
}

Benchmark Harness

#include "ring_buffer.h"
#include <stdio.h>
#include <time.h>
#include <pthread.h>
#include <sched.h>

#define ITERATIONS 10000000
#define CAPACITY   4096

static uint8_t storage[CAPACITY * 8] __attribute__((aligned(64)));
static ring_buffer_t rb;
static volatile int start_flag = 0;

static void pin_cpu(int cpu) {
    cpu_set_t set;
    CPU_ZERO(&set);
    CPU_SET(cpu, &set);
    sched_setaffinity(0, sizeof(set), &set);
}

void *bench_producer(void *arg) {
    pin_cpu(2);
    (void)arg;
    while (!start_flag) __builtin_ia32_pause();

    uint64_t val = 0;
    for (int i = 0; i < ITERATIONS; i++) {
        while (!ring_buffer_push(&rb, &val)) __builtin_ia32_pause();
        val++;
    }
    return NULL;
}

void *bench_consumer(void *arg) {
    pin_cpu(3);
    (void)arg;
    while (!start_flag) __builtin_ia32_pause();

    uint64_t val;
    struct timespec t0, t1;
    clock_gettime(CLOCK_MONOTONIC, &t0);

    for (int i = 0; i < ITERATIONS; i++) {
        while (!ring_buffer_pop(&rb, &val)) __builtin_ia32_pause();
    }

    clock_gettime(CLOCK_MONOTONIC, &t1);
    double ns = (t1.tv_sec - t0.tv_sec) * 1e9 + (t1.tv_nsec - t0.tv_nsec);
    printf("Ring buffer: %.1f ns/op (%.0f Mops/s)\n",
           ns / ITERATIONS, ITERATIONS / (ns / 1e9) / 1e6);
    return NULL;
}

int main(void) {
    ring_buffer_init(&rb, storage, CAPACITY, sizeof(uint64_t));

    pthread_t p, c;
    pthread_create(&p, NULL, bench_producer, NULL);
    pthread_create(&c, NULL, bench_consumer, NULL);

    start_flag = 1;  /* Release both threads */

    pthread_join(p, NULL);
    pthread_join(c, NULL);
    return 0;
}

Build & Run

# Compile with optimizations
gcc -Wall -Wextra -Werror -O3 -std=c11 -o ring_buffer_demo ring_buffer.c -lpthread

# Run benchmark on isolated cores
taskset -c 2,3 ./ring_buffer_demo

Performance

Platform Operation Latency Throughput
x86-64 (Xeon) push + pop ~8 ns 125 Mops/s
x86-64 (Ryzen) push + pop ~6 ns 166 Mops/s
ARM64 (Cortex-A72) push + pop ~15 ns 66 Mops/s

Benchmark Conditions

  • Producer and consumer pinned to separate cores (isolcpus)
  • nohz_full to eliminate timer interrupts
  • rcu_nocbs to move RCU callbacks off isolated cores
  • Slot size: 8 bytes (uint64_t)
  • Ring capacity: 4096 slots

Test Output

$ taskset -c 2,3 ./build/ring_buffer_bench
[ring_buffer] Init: capacity=4096, slot_size=8, mask=0xFFF
[ring_buffer] Push/pop correctness: 1000000 items verified ✓
[ring_buffer] Benchmark (10M ops):
  Ring buffer: 7.8 ns/op (128.2 Mops/s)
[ring_buffer] Full/empty boundary: verified ✓
[PASS] All ring_buffer tests passed