summaryrefslogtreecommitdiff
path: root/src/lib/ssm
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2026-01-20 22:25:41 +0100
committerSander Vrijders <sander@ouroboros.rocks>2026-01-26 07:50:33 +0100
commit0ca48453a067c7862f0bb6b85f152da826f59af7 (patch)
tree5daf26d84777ec6ad1c266601b66e59f9dcc88ca /src/lib/ssm
parent1775201647a10923b9f73addf2304c3124350836 (diff)
downloadouroboros-0ca48453a067c7862f0bb6b85f152da826f59af7.tar.gz
ouroboros-0ca48453a067c7862f0bb6b85f152da826f59af7.zip
lib: Replace rdrbuff with a proper slab allocatorbe
This is a first step towards the Secure Shared Memory (SSM) infrastructure for Ouroboros, which will allow proper resource separation for non-privileged processes. This replaces the rdrbuff (random-deletion ring buffer) PoC allocator with a sharded slab allocator for the packet buffer pool to avoid the head-of-line blocking behaviour of the rdrb and reduce lock contention in multi-process scenarios. Each size class contains multiple independent shards, allowing parallel allocations without blocking. - Configurable shard count per size class (default: 4, set via SSM_POOL_SHARDS in CMake). The configured number of blocks are spread over the number of shards. As an example: SSM_POOL_512_BLOCKS = 768 blocks total These 768 blocks are shared among 4 shards (not 768 × 4 = 3072 blocks) - Lazy block distribution: all blocks initially reside in shard 0 and naturally migrate to process-local shards upon first allocation and subsequent free operations - Fallback with work stealing: processes attempt allocation from their local shard (pid % SSM_POOL_SHARDS) first, then steal from other shards if local is exhausted, eliminating fragmentation while maintaining low contention - Round-robin condvar signaling: blocking allocations cycle through all shard condition variables to ensure fairness - Blocks freed to allocator's shard: uses allocator_pid to determine target shard, enabling natural load balancing as process allocation patterns stabilize over time Maintains existing robust mutex semantics including EOWNERDEAD handling for dead process recovery. Internal structures exposed in ssm.h for testing purposes. Adds some tests (pool_test, pool_sharding_test.c. etc) verifying lazy distribution, migration, fallback stealing, and multiprocess behavior. Updates the ring buffer (rbuff) to use relaxed/acquire/release ordering on atomic indices. The ring buffer requires the (robust) mutex to ensure cross-structure synchronization between pool buffer writes and ring buffer index publication. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/ssm')
-rw-r--r--src/lib/ssm/flow_set.c372
-rw-r--r--src/lib/ssm/pool.c882
-rw-r--r--src/lib/ssm/rbuff.c449
-rw-r--r--src/lib/ssm/ssm.h.in146
-rw-r--r--src/lib/ssm/tests/CMakeLists.txt33
-rw-r--r--src/lib/ssm/tests/flow_set_test.c255
-rw-r--r--src/lib/ssm/tests/pool_sharding_test.c505
-rw-r--r--src/lib/ssm/tests/pool_test.c1038
-rw-r--r--src/lib/ssm/tests/rbuff_test.c675
9 files changed, 4355 insertions, 0 deletions
diff --git a/src/lib/ssm/flow_set.c b/src/lib/ssm/flow_set.c
new file mode 100644
index 00000000..ab24d357
--- /dev/null
+++ b/src/lib/ssm/flow_set.c
@@ -0,0 +1,372 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Management of flow_sets for fqueue
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+#include "ssm.h"
+
+#include <ouroboros/errno.h>
+#include <ouroboros/lockfile.h>
+#include <ouroboros/pthread.h>
+#include <ouroboros/ssm_flow_set.h>
+#include <ouroboros/time.h>
+
+#include <assert.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+/*
+ * pthread_cond_timedwait has a WONTFIX bug as of glibc 2.25 where it
+ * doesn't test pthread cancellation when passed an expired timeout
+ * with the clock set to CLOCK_MONOTONIC.
+ */
+#if ((defined(__linux__) || (defined(__MACH__) && !defined(__APPLE__))) \
+ && (defined(__GLIBC__) && ((__GLIBC__ * 1000 + __GLIBC_MINOR__) >= 2025)) \
+ && (PTHREAD_COND_CLOCK == CLOCK_MONOTONIC))
+#define HAVE_CANCEL_BUG
+#endif
+
+#define FN_MAX_CHARS 255
+#define FS_PROT (PROT_READ | PROT_WRITE)
+
+#define QUEUESIZE ((SSM_RBUFF_SIZE) * sizeof(struct flowevent))
+
+#define SSM_FSET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \
+ + PROG_MAX_FQUEUES * sizeof(size_t) \
+ + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \
+ + PROG_MAX_FQUEUES * QUEUESIZE \
+ + sizeof(pthread_mutex_t))
+
+#define fqueue_ptr(fs, idx) (fs->fqueues + (SSM_RBUFF_SIZE) * idx)
+
+struct ssm_flow_set {
+ ssize_t * mtable;
+ size_t * heads;
+ pthread_cond_t * conds;
+ struct flowevent * fqueues;
+ pthread_mutex_t * lock;
+
+ pid_t pid;
+};
+
+static struct ssm_flow_set * flow_set_create(pid_t pid,
+ int oflags)
+{
+ struct ssm_flow_set * set;
+ ssize_t * shm_base;
+ char fn[FN_MAX_CHARS];
+ int fd;
+
+ sprintf(fn, SSM_FLOW_SET_PREFIX "%d", pid);
+
+ set = malloc(sizeof(*set));
+ if (set == NULL)
+ goto fail_malloc;
+
+ fd = shm_open(fn, oflags, 0666);
+ if (fd == -1)
+ goto fail_shm_open;
+
+ if ((oflags & O_CREAT) && ftruncate(fd, SSM_FSET_FILE_SIZE) < 0)
+ goto fail_truncate;
+
+ shm_base = mmap(NULL, SSM_FSET_FILE_SIZE, FS_PROT, MAP_SHARED, fd, 0);
+ if (shm_base == MAP_FAILED)
+ goto fail_mmap;
+
+ close(fd);
+
+ set->mtable = shm_base;
+ set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS);
+ set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
+ set->fqueues = (struct flowevent *) (set->conds + PROG_MAX_FQUEUES);
+ set->lock = (pthread_mutex_t *)
+ (set->fqueues + PROG_MAX_FQUEUES * (SSM_RBUFF_SIZE));
+
+ return set;
+
+ fail_mmap:
+ if (oflags & O_CREAT)
+ shm_unlink(fn);
+ fail_truncate:
+ close(fd);
+ fail_shm_open:
+ free(set);
+ fail_malloc:
+ return NULL;
+}
+
+struct ssm_flow_set * ssm_flow_set_create(pid_t pid)
+{
+ struct ssm_flow_set * set;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ mode_t mask;
+ int i;
+
+ mask = umask(0);
+
+ set = flow_set_create(pid, O_CREAT | O_RDWR);
+
+ umask(mask);
+
+ if (set == NULL)
+ goto fail_set;
+
+ set->pid = pid;
+
+ if (pthread_mutexattr_init(&mattr))
+ goto fail_mutexattr_init;
+
+#ifdef HAVE_ROBUST_MUTEX
+ if (pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST))
+ goto fail_mattr_set;
+#endif
+ if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED))
+ goto fail_mattr_set;
+
+ if (pthread_mutex_init(set->lock, &mattr))
+ goto fail_mattr_set;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_condattr_init;
+
+ if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED))
+ goto fail_condattr_set;
+
+#ifndef __APPLE__
+ if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK))
+ goto fail_condattr_set;
+#endif
+ for (i = 0; i < PROG_MAX_FQUEUES; ++i) {
+ set->heads[i] = 0;
+ if (pthread_cond_init(&set->conds[i], &cattr))
+ goto fail_init;
+ }
+
+ for (i = 0; i < SYS_MAX_FLOWS; ++i)
+ set->mtable[i] = -1;
+
+ return set;
+
+ fail_init:
+ while (i-- > 0)
+ pthread_cond_destroy(&set->conds[i]);
+ fail_condattr_set:
+ pthread_condattr_destroy(&cattr);
+ fail_condattr_init:
+ pthread_mutex_destroy(set->lock);
+ fail_mattr_set:
+ pthread_mutexattr_destroy(&mattr);
+ fail_mutexattr_init:
+ ssm_flow_set_destroy(set);
+ fail_set:
+ return NULL;
+}
+
+struct ssm_flow_set * ssm_flow_set_open(pid_t pid)
+{
+ return flow_set_create(pid, O_RDWR);
+}
+
+void ssm_flow_set_destroy(struct ssm_flow_set * set)
+{
+ char fn[FN_MAX_CHARS];
+
+ assert(set);
+
+ sprintf(fn, SSM_FLOW_SET_PREFIX "%d", set->pid);
+
+ ssm_flow_set_close(set);
+
+ shm_unlink(fn);
+}
+
+void ssm_flow_set_close(struct ssm_flow_set * set)
+{
+ assert(set);
+
+ munmap(set->mtable, SSM_FSET_FILE_SIZE);
+ free(set);
+}
+
+void ssm_flow_set_zero(struct ssm_flow_set * set,
+ size_t idx)
+{
+ ssize_t i = 0;
+
+ assert(set);
+ assert(idx < PROG_MAX_FQUEUES);
+
+ pthread_mutex_lock(set->lock);
+
+ for (i = 0; i < SYS_MAX_FLOWS; ++i)
+ if (set->mtable[i] == (ssize_t) idx)
+ set->mtable[i] = -1;
+
+ set->heads[idx] = 0;
+
+ pthread_mutex_unlock(set->lock);
+}
+
+
+int ssm_flow_set_add(struct ssm_flow_set * set,
+ size_t idx,
+ int flow_id)
+{
+ assert(set);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
+ assert(idx < PROG_MAX_FQUEUES);
+
+ pthread_mutex_lock(set->lock);
+
+ if (set->mtable[flow_id] != -1) {
+ pthread_mutex_unlock(set->lock);
+ return -EPERM;
+ }
+
+ set->mtable[flow_id] = idx;
+
+ pthread_mutex_unlock(set->lock);
+
+ return 0;
+}
+
+void ssm_flow_set_del(struct ssm_flow_set * set,
+ size_t idx,
+ int flow_id)
+{
+ assert(set);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
+ assert(idx < PROG_MAX_FQUEUES);
+
+ pthread_mutex_lock(set->lock);
+
+ if (set->mtable[flow_id] == (ssize_t) idx)
+ set->mtable[flow_id] = -1;
+
+ pthread_mutex_unlock(set->lock);
+}
+
+int ssm_flow_set_has(struct ssm_flow_set * set,
+ size_t idx,
+ int flow_id)
+{
+ int ret = 0;
+
+ assert(set);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
+ assert(idx < PROG_MAX_FQUEUES);
+
+ pthread_mutex_lock(set->lock);
+
+ if (set->mtable[flow_id] == (ssize_t) idx)
+ ret = 1;
+
+ pthread_mutex_unlock(set->lock);
+
+ return ret;
+}
+
+void ssm_flow_set_notify(struct ssm_flow_set * set,
+ int flow_id,
+ int event)
+{
+ struct flowevent * e;
+
+ assert(set);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
+
+ pthread_mutex_lock(set->lock);
+
+ if (set->mtable[flow_id] == -1) {
+ pthread_mutex_unlock(set->lock);
+ return;
+ }
+
+ e = fqueue_ptr(set, set->mtable[flow_id]) +
+ set->heads[set->mtable[flow_id]];
+
+ e->flow_id = flow_id;
+ e->event = event;
+
+ ++set->heads[set->mtable[flow_id]];
+
+ pthread_cond_signal(&set->conds[set->mtable[flow_id]]);
+
+ pthread_mutex_unlock(set->lock);
+}
+
+
+ssize_t ssm_flow_set_wait(const struct ssm_flow_set * set,
+ size_t idx,
+ struct flowevent * fqueue,
+ const struct timespec * abstime)
+{
+ ssize_t ret = 0;
+
+ assert(set);
+ assert(idx < PROG_MAX_FQUEUES);
+ assert(fqueue);
+
+#ifndef HAVE_ROBUST_MUTEX
+ pthread_mutex_lock(set->lock);
+#else
+ if (pthread_mutex_lock(set->lock) == EOWNERDEAD)
+ pthread_mutex_consistent(set->lock);
+#endif
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, set->lock);
+
+ while (set->heads[idx] == 0 && ret != -ETIMEDOUT) {
+ ret = -__timedwait(set->conds + idx, set->lock, abstime);
+#ifdef HAVE_CANCEL_BUG
+ if (ret == -ETIMEDOUT)
+ pthread_testcancel();
+#endif
+#ifdef HAVE_ROBUST_MUTEX
+ if (ret == -EOWNERDEAD)
+ pthread_mutex_consistent(set->lock);
+#endif
+ }
+
+ if (ret != -ETIMEDOUT) {
+ memcpy(fqueue,
+ fqueue_ptr(set, idx),
+ set->heads[idx] * sizeof(*fqueue));
+ ret = set->heads[idx];
+ set->heads[idx] = 0;
+ }
+
+ pthread_cleanup_pop(true);
+
+ assert(ret);
+
+ return ret;
+}
diff --git a/src/lib/ssm/pool.c b/src/lib/ssm/pool.c
new file mode 100644
index 00000000..b8cfe3a1
--- /dev/null
+++ b/src/lib/ssm/pool.c
@@ -0,0 +1,882 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Secure Shared Memory Infrastructure (SSMI) Packet Buffer
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+
+#include <ouroboros/errno.h>
+#include <ouroboros/pthread.h>
+#include <ouroboros/ssm_pool.h>
+
+#include "ssm.h"
+
+#include <assert.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+/* Size class configuration from CMake */
+static const struct ssm_size_class_cfg ssm_sc_cfg[SSM_POOL_MAX_CLASSES] = {
+ { (1 << 8), SSM_POOL_256_BLOCKS },
+ { (1 << 9), SSM_POOL_512_BLOCKS },
+ { (1 << 10), SSM_POOL_1K_BLOCKS },
+ { (1 << 11), SSM_POOL_2K_BLOCKS },
+ { (1 << 12), SSM_POOL_4K_BLOCKS },
+ { (1 << 14), SSM_POOL_16K_BLOCKS },
+ { (1 << 16), SSM_POOL_64K_BLOCKS },
+ { (1 << 18), SSM_POOL_256K_BLOCKS },
+ { (1 << 20), SSM_POOL_1M_BLOCKS },
+};
+
+#define PTR_TO_OFFSET(pool_base, ptr) \
+ ((uintptr_t)(ptr) - (uintptr_t)(pool_base))
+
+#define OFFSET_TO_PTR(pool_base, offset) \
+ ((offset == 0) ? NULL : (void *)((uintptr_t)(pool_base) + offset))
+
+#define GET_SHARD_FOR_PID(pid) ((int)((pid) % SSM_POOL_SHARDS))
+
+#define LOAD_RELAXED(ptr) \
+ (__atomic_load_n(ptr, __ATOMIC_RELAXED))
+
+#define LOAD_ACQUIRE(ptr) \
+ (__atomic_load_n(ptr, __ATOMIC_ACQUIRE))
+
+#define STORE_RELEASE(ptr, val) \
+ (__atomic_store_n(ptr, val, __ATOMIC_RELEASE))
+
+#define LOAD(ptr) \
+ (__atomic_load_n(ptr, __ATOMIC_SEQ_CST))
+
+#define STORE(ptr, val) \
+ (__atomic_store_n(ptr, val, __ATOMIC_SEQ_CST))
+
+#define FETCH_ADD(ptr, val) \
+ (__atomic_fetch_add(ptr, val, __ATOMIC_SEQ_CST))
+
+#define FETCH_SUB(ptr, val) \
+ (__atomic_fetch_sub(ptr, val, __ATOMIC_SEQ_CST))
+
+#define CAS(ptr, expected, desired) \
+ (__atomic_compare_exchange_n(ptr, expected, desired, false, \
+ __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
+
+#define SSM_FILE_SIZE (SSM_POOL_TOTAL_SIZE + sizeof(struct _ssm_pool_hdr))
+
+struct ssm_pool {
+ uint8_t * shm_base; /* start of blocks */
+ struct _ssm_pool_hdr * hdr; /* shared memory header */
+ void * pool_base; /* base of the memory pool */
+};
+
+static __inline__
+struct ssm_pk_buff * list_remove_head(struct _ssm_list_head * head,
+ void * base)
+{
+ uint32_t off;
+ uint32_t next_off;
+ struct ssm_pk_buff * blk;
+
+ assert(head != NULL);
+ assert(base != NULL);
+
+ off = LOAD(&head->head_offset);
+ if (off == 0)
+ return NULL;
+
+ /* Validate offset is within pool bounds */
+ if (off >= SSM_POOL_TOTAL_SIZE)
+ return NULL;
+
+ blk = OFFSET_TO_PTR(base, off);
+ next_off = LOAD(&blk->next_offset);
+
+
+
+ STORE(&head->head_offset, next_off);
+ STORE(&head->count, LOAD(&head->count) - 1);
+
+ return blk;
+}
+static __inline__ void list_add_head(struct _ssm_list_head * head,
+ struct ssm_pk_buff * blk,
+ void * base)
+{
+ uint32_t off;
+ uint32_t old;
+
+ assert(head != NULL);
+ assert(blk != NULL);
+ assert(base != NULL);
+
+ off = (uint32_t) PTR_TO_OFFSET(base, blk);
+ old = LOAD(&head->head_offset);
+
+ STORE(&blk->next_offset, old);
+ STORE(&head->head_offset, off);
+ STORE(&head->count, LOAD(&head->count) + 1);
+}
+
+static __inline__ int select_size_class(size_t len)
+{
+ size_t sz;
+ int i;
+
+ /* Total space needed: header + headspace + data + tailspace */
+ sz = sizeof(struct ssm_pk_buff) + SSM_PK_BUFF_HEADSPACE + len
+ + SSM_PK_BUFF_TAILSPACE;
+
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ if (ssm_sc_cfg[i].blocks > 0 && sz <= ssm_sc_cfg[i].size)
+ return i;
+ }
+
+ return -1;
+}
+
+static __inline__ int find_size_class_for_offset(struct ssm_pool * pool,
+ size_t offset)
+{
+ int c;
+
+ assert(pool != NULL);
+
+ for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) {
+ struct _ssm_size_class * sc = &pool->hdr->size_classes[c];
+
+ if (sc->object_size == 0)
+ continue;
+
+ if (offset >= sc->pool_start &&
+ offset < sc->pool_start + sc->pool_size)
+ return c;
+ }
+
+ return -1;
+}
+
+static void init_size_classes(struct ssm_pool * pool)
+{
+ struct _ssm_size_class * sc;
+ struct _ssm_shard * shard;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ uint8_t * region;
+ size_t offset;
+ int c;
+ int s;
+ size_t i;
+
+ assert(pool != NULL);
+
+ /* Check if already initialized */
+ if (LOAD(&pool->hdr->initialized) != 0)
+ return;
+
+ pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+#ifdef HAVE_ROBUST_MUTEX
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+#endif
+ pthread_mutexattr_setprotocol(&mattr, PTHREAD_PRIO_INHERIT);
+
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ offset = 0;
+
+ for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) {
+ if (ssm_sc_cfg[c].blocks == 0)
+ continue;
+
+ sc = &pool->hdr->size_classes[c];
+
+ sc->object_size = ssm_sc_cfg[c].size;
+ sc->pool_start = offset;
+ sc->pool_size = ssm_sc_cfg[c].size * ssm_sc_cfg[c].blocks;
+ sc->object_count = ssm_sc_cfg[c].blocks;
+
+ /* Initialize all shards */
+ for (s = 0; s < SSM_POOL_SHARDS; s++) {
+ shard = &sc->shards[s];
+
+ STORE(&shard->free_list.head_offset, 0);
+ STORE(&shard->free_list.count, 0);
+ STORE(&shard->free_count, 0);
+
+ pthread_mutex_init(&shard->mtx, &mattr);
+ pthread_cond_init(&shard->cond, &cattr);
+ }
+
+ /* Lazy distribution: put all blocks in shard 0 initially */
+ region = pool->shm_base + offset;
+
+ for (i = 0; i < sc->object_count; ++i) {
+ struct ssm_pk_buff * blk;
+
+ blk = (struct ssm_pk_buff *)
+ (region + i * sc->object_size);
+
+ STORE(&blk->refcount, 0);
+ blk->allocator_pid = 0;
+ STORE(&blk->next_offset, 0);
+
+ list_add_head(&sc->shards[0].free_list, blk,
+ pool->pool_base);
+ FETCH_ADD(&sc->shards[0].free_count, 1);
+ }
+
+ offset += sc->pool_size;
+ }
+
+ /* Mark as initialized - acts as memory barrier */
+ STORE(&pool->hdr->initialized, 1);
+
+ pthread_mutexattr_destroy(&mattr);
+ pthread_condattr_destroy(&cattr);
+}
+
+/*
+ * Reclaim all blocks allocated by a specific pid in a size class.
+ * Called with shard mutex held.
+ */
+static size_t reclaim_pid_from_sc(struct _ssm_size_class * sc,
+ struct _ssm_shard * shard,
+ void * pool_base,
+ pid_t pid)
+{
+ uint8_t * region;
+ size_t i;
+ size_t recovered = 0;
+ struct ssm_pk_buff * blk;
+
+ region = (uint8_t *) pool_base + sc->pool_start;
+
+ for (i = 0; i < sc->object_count; ++i) {
+ blk = (struct ssm_pk_buff *)(region + i * sc->object_size);
+
+ if (blk->allocator_pid == pid && LOAD(&blk->refcount) > 0) {
+ STORE(&blk->refcount, 0);
+ blk->allocator_pid = 0;
+ list_add_head(&shard->free_list, blk, pool_base);
+ FETCH_ADD(&shard->free_count, 1);
+ recovered++;
+ }
+ }
+
+ return recovered;
+}
+
+void ssm_pool_reclaim_orphans(struct ssm_pool * pool,
+ pid_t pid)
+{
+ size_t sc_idx;
+
+ if (pool == NULL || pid <= 0)
+ return;
+
+ for (sc_idx = 0; sc_idx < SSM_POOL_MAX_CLASSES; sc_idx++) {
+ struct _ssm_size_class * sc;
+ struct _ssm_shard * shard;
+
+ sc = &pool->hdr->size_classes[sc_idx];
+ if (sc->object_count == 0)
+ continue;
+
+ /* Reclaim to shard 0 for simplicity */
+ shard = &sc->shards[0];
+ robust_mutex_lock(&shard->mtx);
+ reclaim_pid_from_sc(sc, shard, pool->pool_base, pid);
+ pthread_mutex_unlock(&shard->mtx);
+ }
+}
+
+static __inline__
+struct ssm_pk_buff * try_alloc_from_shard(struct _ssm_shard * shard,
+ void * base)
+{
+ struct ssm_pk_buff * blk;
+
+ robust_mutex_lock(&shard->mtx);
+
+ if (LOAD(&shard->free_count) > 0) {
+ blk = list_remove_head(&shard->free_list, base);
+ if (blk != NULL) {
+ FETCH_SUB(&shard->free_count, 1);
+ return blk; /* Caller must unlock */
+ }
+ FETCH_SUB(&shard->free_count, 1);
+ }
+
+ pthread_mutex_unlock(&shard->mtx);
+ return NULL;
+}
+
+static __inline__ ssize_t init_block(struct ssm_pool * pool,
+ struct _ssm_size_class * sc,
+ struct _ssm_shard * shard,
+ struct ssm_pk_buff * blk,
+ size_t len,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb)
+{
+ STORE(&blk->refcount, 1);
+ blk->allocator_pid = getpid();
+ blk->size = (uint32_t) (sc->object_size -
+ sizeof(struct ssm_pk_buff));
+ blk->pk_head = SSM_PK_BUFF_HEADSPACE;
+ blk->pk_tail = blk->pk_head + (uint32_t) len;
+ blk->off = (uint32_t) PTR_TO_OFFSET(pool->pool_base, blk);
+
+ pthread_mutex_unlock(&shard->mtx);
+
+ *spb = blk;
+ if (ptr != NULL)
+ *ptr = blk->data + blk->pk_head;
+
+ return blk->off;
+}
+
+/* Non-blocking allocation from size class */
+static ssize_t alloc_from_sc(struct ssm_pool * pool,
+ int idx,
+ size_t len,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb)
+{
+ struct _ssm_size_class * sc;
+ struct ssm_pk_buff * blk;
+ int local;
+ int s;
+
+ assert(pool != NULL);
+ assert(idx >= 0 && idx < SSM_POOL_MAX_CLASSES);
+ assert(spb != NULL);
+
+ sc = &pool->hdr->size_classes[idx];
+ local = GET_SHARD_FOR_PID(getpid());
+
+ for (s = 0; s < SSM_POOL_SHARDS; s++) {
+ struct _ssm_shard * shard;
+ int idx;
+
+ idx = (local + s) % SSM_POOL_SHARDS;
+ shard = &sc->shards[idx];
+
+ blk = try_alloc_from_shard(shard, pool->pool_base);
+ if (blk != NULL)
+ return init_block(pool, sc, shard, blk, len, ptr, spb);
+ }
+
+ return -EAGAIN;
+}
+
+/* Blocking allocation from size class */
+static ssize_t alloc_from_sc_b(struct ssm_pool * pool,
+ int idx,
+ size_t len,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb,
+ const struct timespec * abstime)
+{
+ struct _ssm_size_class * sc;
+ struct _ssm_shard * shard;
+ struct ssm_pk_buff * blk = NULL;
+ int local;
+ int s;
+ int ret = 0;
+
+ assert(pool != NULL);
+ assert(idx >= 0 && idx < SSM_POOL_MAX_CLASSES);
+ assert(spb != NULL);
+
+ sc = &pool->hdr->size_classes[idx];
+ local = GET_SHARD_FOR_PID(getpid());
+
+ while (blk == NULL && ret != ETIMEDOUT) {
+ /* Try non-blocking allocation from any shard */
+ for (s = 0; s < SSM_POOL_SHARDS && blk == NULL; s++) {
+ shard = &sc->shards[(local + s) % SSM_POOL_SHARDS];
+ blk = try_alloc_from_shard(shard, pool->pool_base);
+ }
+
+ if (blk != NULL)
+ break;
+
+ /* Nothing available, wait for signal */
+ shard = &sc->shards[local];
+ robust_mutex_lock(&shard->mtx);
+ ret = robust_wait(&shard->cond, &shard->mtx, abstime);
+ pthread_mutex_unlock(&shard->mtx);
+ }
+
+ if (ret == ETIMEDOUT)
+ return -ETIMEDOUT;
+
+ return init_block(pool, sc, shard, blk, len, ptr, spb);
+}
+
+/* Global Shared Packet Pool */
+static char * gspp_filename(void)
+{
+ char * str;
+ char * test_suffix;
+
+ test_suffix = getenv("OUROBOROS_TEST_POOL_SUFFIX");
+ if (test_suffix != NULL) {
+ str = malloc(strlen(SSM_POOL_NAME) + strlen(test_suffix) + 1);
+ if (str == NULL)
+ return NULL;
+ sprintf(str, "%s%s", SSM_POOL_NAME, test_suffix);
+ } else {
+ str = malloc(strlen(SSM_POOL_NAME) + 1);
+ if (str == NULL)
+ return NULL;
+ sprintf(str, "%s", SSM_POOL_NAME);
+ }
+
+ return str;
+}
+
+void ssm_pool_close(struct ssm_pool * pool)
+{
+ assert(pool != NULL);
+
+ munmap(pool->shm_base, SSM_FILE_SIZE);
+ free(pool);
+}
+
+void ssm_pool_destroy(struct ssm_pool * pool)
+{
+ char * fn;
+
+ assert(pool != NULL);
+
+ if (getpid() != pool->hdr->pid && kill(pool->hdr->pid, 0) == 0) {
+ free(pool);
+ return;
+ }
+
+ ssm_pool_close(pool);
+
+ fn = gspp_filename();
+ if (fn == NULL)
+ return;
+
+ shm_unlink(fn);
+ free(fn);
+}
+
+#define MM_FLAGS (PROT_READ | PROT_WRITE)
+
+static struct ssm_pool * pool_create(int flags)
+{
+ struct ssm_pool * pool;
+ int fd;
+ uint8_t * shm_base;
+ char * fn;
+
+ fn = gspp_filename();
+ if (fn == NULL)
+ goto fail_fn;
+
+ pool = malloc(sizeof *pool);
+ if (pool == NULL)
+ goto fail_rdrb;
+
+ fd = shm_open(fn, flags, 0666);
+ if (fd == -1)
+ goto fail_open;
+
+ if ((flags & O_CREAT) && ftruncate(fd, SSM_FILE_SIZE) < 0)
+ goto fail_truncate;
+
+ shm_base = mmap(NULL, SSM_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0);
+ if (shm_base == MAP_FAILED)
+ goto fail_truncate;
+
+ pool->shm_base = shm_base;
+ pool->pool_base = shm_base;
+ pool->hdr = (struct _ssm_pool_hdr *) (shm_base + SSM_POOL_TOTAL_SIZE);
+
+ if (flags & O_CREAT)
+ pool->hdr->mapped_addr = shm_base;
+
+ close(fd);
+
+ free(fn);
+
+ return pool;
+
+ fail_truncate:
+ close(fd);
+ if (flags & O_CREAT)
+ shm_unlink(fn);
+ fail_open:
+ free(pool);
+ fail_rdrb:
+ free(fn);
+ fail_fn:
+ return NULL;
+}
+
+struct ssm_pool * ssm_pool_create(void)
+{
+ struct ssm_pool * pool;
+ mode_t mask;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+
+ mask = umask(0);
+
+ pool = pool_create(O_CREAT | O_EXCL | O_RDWR);
+
+ umask(mask);
+
+ if (pool == NULL)
+ goto fail_rdrb;
+
+ if (pthread_mutexattr_init(&mattr))
+ goto fail_mattr;
+
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+#ifdef HAVE_ROBUST_MUTEX
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+#endif
+ if (pthread_mutex_init(&pool->hdr->mtx, &mattr))
+ goto fail_mutex;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&pool->hdr->healthy, &cattr))
+ goto fail_healthy;
+
+ pool->hdr->pid = getpid();
+ /* Will be set by init_size_classes */
+ STORE(&pool->hdr->initialized, 0);
+
+ init_size_classes(pool);
+
+ pthread_mutexattr_destroy(&mattr);
+ pthread_condattr_destroy(&cattr);
+
+ return pool;
+
+ fail_healthy:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(&pool->hdr->mtx);
+ fail_mutex:
+ pthread_mutexattr_destroy(&mattr);
+ fail_mattr:
+ ssm_pool_destroy(pool);
+ fail_rdrb:
+ return NULL;
+}
+
+struct ssm_pool * ssm_pool_open(void)
+{
+ struct ssm_pool * pool;
+
+ pool = pool_create(O_RDWR);
+ if (pool != NULL)
+ init_size_classes(pool);
+
+ return pool;
+}
+
+void ssm_pool_purge(void)
+{
+ char * fn;
+
+ fn = gspp_filename();
+ if (fn == NULL)
+ return;
+
+ shm_unlink(fn);
+ free(fn);
+}
+
+int ssm_pool_mlock(struct ssm_pool * pool)
+{
+ assert(pool != NULL);
+
+ return mlock(pool->shm_base, SSM_FILE_SIZE);
+}
+
+ssize_t ssm_pool_alloc(struct ssm_pool * pool,
+ size_t count,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb)
+{
+ int idx;
+
+ assert(pool != NULL);
+ assert(spb != NULL);
+
+ idx = select_size_class(count);
+ if (idx >= 0)
+ return alloc_from_sc(pool, idx, count, ptr, spb);
+
+ return -EMSGSIZE;
+}
+
+ssize_t ssm_pool_alloc_b(struct ssm_pool * pool,
+ size_t count,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb,
+ const struct timespec * abstime)
+{
+ int idx;
+
+ assert(pool != NULL);
+ assert(spb != NULL);
+
+ idx = select_size_class(count);
+ if (idx >= 0)
+ return alloc_from_sc_b(pool, idx, count, ptr, spb, abstime);
+
+ return -EMSGSIZE;
+}
+
+ssize_t ssm_pool_read(uint8_t ** dst,
+ struct ssm_pool * pool,
+ size_t off)
+{
+ struct ssm_pk_buff * blk;
+
+ assert(dst != NULL);
+ assert(pool != NULL);
+
+ blk = OFFSET_TO_PTR(pool->pool_base, off);
+ if (blk == NULL)
+ return -EINVAL;
+
+ *dst = blk->data + blk->pk_head;
+
+ return (ssize_t) (blk->pk_tail - blk->pk_head);
+}
+
+struct ssm_pk_buff * ssm_pool_get(struct ssm_pool * pool,
+ size_t off)
+{
+ struct ssm_pk_buff * blk;
+
+ assert(pool != NULL);
+
+ if (off == 0 || off >= (size_t) SSM_POOL_TOTAL_SIZE)
+ return NULL;
+
+ blk = OFFSET_TO_PTR(pool->pool_base, off);
+ if (blk == NULL)
+ return NULL;
+
+ if (LOAD(&blk->refcount) == 0)
+ return NULL;
+
+ return blk;
+}
+
+int ssm_pool_remove(struct ssm_pool * pool,
+ size_t off)
+{
+ struct ssm_pk_buff * blk;
+ struct _ssm_size_class * sc;
+ struct _ssm_shard * shard;
+ int sc_idx;
+ int shard_idx;
+ uint16_t old_ref;
+
+ assert(pool != NULL);
+
+ if (off == 0 || off >= SSM_POOL_TOTAL_SIZE)
+ return -EINVAL;
+
+ blk = OFFSET_TO_PTR(pool->pool_base, off);
+ if (blk == NULL)
+ return -EINVAL;
+
+ sc_idx = find_size_class_for_offset(pool, off);
+ if (sc_idx < 0)
+ return -EINVAL;
+
+ sc = &pool->hdr->size_classes[sc_idx];
+
+ /* Free to allocator's shard (lazy distribution in action) */
+ shard_idx = GET_SHARD_FOR_PID(blk->allocator_pid);
+ shard = &sc->shards[shard_idx];
+
+ robust_mutex_lock(&shard->mtx);
+
+ old_ref = FETCH_SUB(&blk->refcount, 1);
+ if (old_ref > 1) {
+ /* Still referenced */
+ pthread_mutex_unlock(&shard->mtx);
+ return 0;
+ }
+
+ blk->allocator_pid = 0;
+#ifdef CONFIG_OUROBOROS_DEBUG
+ if (old_ref == 0) {
+ /* Underflow - double free attempt */
+ pthread_mutex_unlock(&shard->mtx);
+ abort();
+ }
+
+ /* Poison fields to detect use-after-free */
+ blk->pk_head = 0xDEAD;
+ blk->pk_tail = 0xBEEF;
+#endif
+ list_add_head(&shard->free_list, blk, pool->pool_base);
+ FETCH_ADD(&shard->free_count, 1);
+
+ pthread_cond_signal(&shard->cond);
+
+ pthread_mutex_unlock(&shard->mtx);
+
+ return 0;
+}
+
+size_t ssm_pk_buff_get_idx(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ return spb->off;
+}
+
+uint8_t * ssm_pk_buff_head(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ return spb->data + spb->pk_head;
+}
+
+uint8_t * ssm_pk_buff_tail(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ return spb->data + spb->pk_tail;
+}
+
+size_t ssm_pk_buff_len(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ return spb->pk_tail - spb->pk_head;
+}
+
+uint8_t * ssm_pk_buff_head_alloc(struct ssm_pk_buff * spb,
+ size_t size)
+{
+ assert(spb != NULL);
+
+ if (spb->pk_head < size)
+ return NULL;
+
+ spb->pk_head -= size;
+
+ return spb->data + spb->pk_head;
+}
+
+uint8_t * ssm_pk_buff_tail_alloc(struct ssm_pk_buff * spb,
+ size_t size)
+{
+ uint8_t * buf;
+
+ assert(spb != NULL);
+
+ if (spb->pk_tail + size >= spb->size)
+ return NULL;
+
+ buf = spb->data + spb->pk_tail;
+
+ spb->pk_tail += size;
+
+ return buf;
+}
+
+uint8_t * ssm_pk_buff_head_release(struct ssm_pk_buff * spb,
+ size_t size)
+{
+ uint8_t * buf;
+
+ assert(spb != NULL);
+ assert(!(size > spb->pk_tail - spb->pk_head));
+
+ buf = spb->data + spb->pk_head;
+
+ spb->pk_head += size;
+
+ return buf;
+}
+
+uint8_t * ssm_pk_buff_tail_release(struct ssm_pk_buff * spb,
+ size_t size)
+{
+ assert(spb != NULL);
+ assert(!(size > spb->pk_tail - spb->pk_head));
+
+ spb->pk_tail -= size;
+
+ return spb->data + spb->pk_tail;
+}
+
+void ssm_pk_buff_truncate(struct ssm_pk_buff * spb,
+ size_t len)
+{
+ assert(spb != NULL);
+ assert(len <= spb->size);
+
+ spb->pk_tail = spb->pk_head + len;
+}
+
+int ssm_pk_buff_wait_ack(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ FETCH_ADD(&spb->refcount, 1);
+
+ return 0;
+}
+
+int ssm_pk_buff_ack(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ FETCH_SUB(&spb->refcount, 1);
+
+ return 0;
+}
diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c
new file mode 100644
index 00000000..e18f8ba7
--- /dev/null
+++ b/src/lib/ssm/rbuff.c
@@ -0,0 +1,449 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Ring buffer implementations for incoming packets
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+#include "ssm.h"
+
+#include <ouroboros/ssm_rbuff.h>
+#include <ouroboros/lockfile.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
+#include <ouroboros/pthread.h>
+#include <ouroboros/time.h>
+
+#include <assert.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+#define FN_MAX_CHARS 255
+
+#define SSM_RBUFF_FILESIZE ((SSM_RBUFF_SIZE) * sizeof(ssize_t) \
+ + 3 * sizeof(size_t) \
+ + sizeof(pthread_mutex_t) \
+ + 2 * sizeof(pthread_cond_t))
+
+#define MODB(x) ((x) & (SSM_RBUFF_SIZE - 1))
+
+#define LOAD_RELAXED(ptr) (__atomic_load_n(ptr, __ATOMIC_RELAXED))
+#define LOAD_ACQUIRE(ptr) (__atomic_load_n(ptr, __ATOMIC_ACQUIRE))
+#define STORE_RELEASE(ptr, val) \
+ (__atomic_store_n(ptr, val, __ATOMIC_RELEASE))
+
+#define HEAD(rb) (rb->shm_base[LOAD_RELAXED(rb->head)])
+#define TAIL(rb) (rb->shm_base[LOAD_RELAXED(rb->tail)])
+#define HEAD_IDX(rb) (LOAD_ACQUIRE(rb->head))
+#define TAIL_IDX(rb) (LOAD_ACQUIRE(rb->tail))
+#define ADVANCE_HEAD(rb) \
+ (STORE_RELEASE(rb->head, MODB(LOAD_RELAXED(rb->head) + 1)))
+#define ADVANCE_TAIL(rb) \
+ (STORE_RELEASE(rb->tail, MODB(LOAD_RELAXED(rb->tail) + 1)))
+#define QUEUED(rb) (MODB(HEAD_IDX(rb) - TAIL_IDX(rb)))
+#define IS_FULL(rb) (QUEUED(rb) == (SSM_RBUFF_SIZE - 1))
+#define IS_EMPTY(rb) (HEAD_IDX(rb) == TAIL_IDX(rb))
+
+struct ssm_rbuff {
+ ssize_t * shm_base; /* start of shared memory */
+ size_t * head; /* start of ringbuffer */
+ size_t * tail;
+ size_t * acl; /* access control */
+ pthread_mutex_t * mtx; /* lock for cond vars only */
+ pthread_cond_t * add; /* signal when new data */
+ pthread_cond_t * del; /* signal when data removed */
+ pid_t pid; /* pid of the owner */
+ int flow_id; /* flow_id of the flow */
+};
+
+#define MM_FLAGS (PROT_READ | PROT_WRITE)
+
+static struct ssm_rbuff * rbuff_create(pid_t pid,
+ int flow_id,
+ int flags)
+{
+ struct ssm_rbuff * rb;
+ int fd;
+ ssize_t * shm_base;
+ char fn[FN_MAX_CHARS];
+
+ sprintf(fn, SSM_RBUFF_PREFIX "%d.%d", pid, flow_id);
+
+ rb = malloc(sizeof(*rb));
+ if (rb == NULL)
+ goto fail_malloc;
+
+ fd = shm_open(fn, flags, 0666);
+ if (fd == -1)
+ goto fail_open;
+
+ if ((flags & O_CREAT) && ftruncate(fd, SSM_RBUFF_FILESIZE) < 0)
+ goto fail_truncate;
+
+ shm_base = mmap(NULL, SSM_RBUFF_FILESIZE, MM_FLAGS, MAP_SHARED, fd, 0);
+
+ close(fd);
+
+ rb->shm_base = shm_base;
+ rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE));
+ rb->tail = (size_t *) (rb->head + 1);
+ rb->acl = (size_t *) (rb->tail + 1);
+ rb->mtx = (pthread_mutex_t *) (rb->acl + 1);
+ rb->add = (pthread_cond_t *) (rb->mtx + 1);
+ rb->del = rb->add + 1;
+ rb->pid = pid;
+ rb->flow_id = flow_id;
+
+ return rb;
+
+ fail_truncate:
+ close(fd);
+ if (flags & O_CREAT)
+ shm_unlink(fn);
+ fail_open:
+ free(rb);
+ fail_malloc:
+ return NULL;
+}
+
+static void rbuff_destroy(struct ssm_rbuff * rb)
+{
+ munmap(rb->shm_base, SSM_RBUFF_FILESIZE);
+
+ free(rb);
+}
+
+struct ssm_rbuff * ssm_rbuff_create(pid_t pid,
+ int flow_id)
+{
+ struct ssm_rbuff * rb;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ mode_t mask;
+
+ mask = umask(0);
+
+ rb = rbuff_create(pid, flow_id, O_CREAT | O_EXCL | O_RDWR);
+
+ umask(mask);
+
+ if (rb == NULL)
+ goto fail_rb;
+
+ if (pthread_mutexattr_init(&mattr))
+ goto fail_mattr;
+
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+#ifdef HAVE_ROBUST_MUTEX
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+#endif
+ if (pthread_mutex_init(rb->mtx, &mattr))
+ goto fail_mutex;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(rb->add, &cattr))
+ goto fail_add;
+
+ if (pthread_cond_init(rb->del, &cattr))
+ goto fail_del;
+
+ *rb->acl = ACL_RDWR;
+ *rb->head = 0;
+ *rb->tail = 0;
+
+ rb->pid = pid;
+ rb->flow_id = flow_id;
+
+ pthread_mutexattr_destroy(&mattr);
+ pthread_condattr_destroy(&cattr);
+
+ return rb;
+
+ fail_del:
+ pthread_cond_destroy(rb->add);
+ fail_add:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(rb->mtx);
+ fail_mutex:
+ pthread_mutexattr_destroy(&mattr);
+ fail_mattr:
+ ssm_rbuff_destroy(rb);
+ fail_rb:
+ return NULL;
+}
+
+void ssm_rbuff_destroy(struct ssm_rbuff * rb)
+{
+ char fn[FN_MAX_CHARS];
+
+ assert(rb != NULL);
+
+ sprintf(fn, SSM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id);
+
+ ssm_rbuff_close(rb);
+
+ shm_unlink(fn);
+}
+
+struct ssm_rbuff * ssm_rbuff_open(pid_t pid,
+ int flow_id)
+{
+ return rbuff_create(pid, flow_id, O_RDWR);
+}
+
+void ssm_rbuff_close(struct ssm_rbuff * rb)
+{
+ assert(rb);
+
+ rbuff_destroy(rb);
+}
+
+int ssm_rbuff_write(struct ssm_rbuff * rb,
+ size_t idx)
+{
+ size_t acl;
+ bool was_empty;
+ int ret = 0;
+
+ assert(rb != NULL);
+
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ if (acl != ACL_RDWR) {
+ if (acl & ACL_FLOWDOWN) {
+ ret = -EFLOWDOWN;
+ goto fail_acl;
+ }
+ if (acl & ACL_RDONLY) {
+ ret = -ENOTALLOC;
+ goto fail_acl;
+ }
+ }
+
+ robust_mutex_lock(rb->mtx);
+
+ if (IS_FULL(rb)) {
+ ret = -EAGAIN;
+ goto fail_mutex;
+ }
+
+ was_empty = IS_EMPTY(rb);
+
+ HEAD(rb) = (ssize_t) idx;
+ ADVANCE_HEAD(rb);
+
+ if (was_empty)
+ pthread_cond_broadcast(rb->add);
+
+ pthread_mutex_unlock(rb->mtx);
+
+ return 0;
+
+ fail_mutex:
+ pthread_mutex_unlock(rb->mtx);
+ fail_acl:
+ return ret;
+}
+
+int ssm_rbuff_write_b(struct ssm_rbuff * rb,
+ size_t idx,
+ const struct timespec * abstime)
+{
+ size_t acl;
+ int ret = 0;
+ bool was_empty;
+
+ assert(rb != NULL);
+
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ if (acl != ACL_RDWR) {
+ if (acl & ACL_FLOWDOWN) {
+ ret = -EFLOWDOWN;
+ goto fail_acl;
+ }
+ if (acl & ACL_RDONLY) {
+ ret = -ENOTALLOC;
+ goto fail_acl;
+ }
+ }
+
+ robust_mutex_lock(rb->mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
+
+ while (IS_FULL(rb) && ret != -ETIMEDOUT) {
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ if (acl & ACL_FLOWDOWN) {
+ ret = -EFLOWDOWN;
+ break;
+ }
+ ret = -robust_wait(rb->del, rb->mtx, abstime);
+ }
+
+ pthread_cleanup_pop(false);
+
+ if (ret != -ETIMEDOUT && ret != -EFLOWDOWN) {
+ was_empty = IS_EMPTY(rb);
+ HEAD(rb) = (ssize_t) idx;
+ ADVANCE_HEAD(rb);
+ if (was_empty)
+ pthread_cond_broadcast(rb->add);
+ }
+
+ pthread_mutex_unlock(rb->mtx);
+
+ fail_acl:
+ return ret;
+}
+
+static int check_rb_acl(struct ssm_rbuff * rb)
+{
+ size_t acl;
+
+ assert(rb != NULL);
+
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+
+ if (acl & ACL_FLOWDOWN)
+ return -EFLOWDOWN;
+
+ if (acl & ACL_FLOWPEER)
+ return -EFLOWPEER;
+
+ return -EAGAIN;
+}
+
+ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
+{
+ ssize_t ret;
+
+ assert(rb != NULL);
+
+ if (IS_EMPTY(rb))
+ return check_rb_acl(rb);
+
+ robust_mutex_lock(rb->mtx);
+
+ ret = TAIL(rb);
+ ADVANCE_TAIL(rb);
+
+ pthread_cond_broadcast(rb->del);
+
+ pthread_mutex_unlock(rb->mtx);
+
+ return ret;
+}
+
+ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
+ const struct timespec * abstime)
+{
+ ssize_t idx = -1;
+ size_t acl;
+
+ assert(rb != NULL);
+
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN))
+ return -EFLOWDOWN;
+
+ robust_mutex_lock(rb->mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
+
+ while (IS_EMPTY(rb) &&
+ idx != -ETIMEDOUT &&
+ check_rb_acl(rb) == -EAGAIN) {
+ idx = -robust_wait(rb->add, rb->mtx, abstime);
+ }
+
+ pthread_cleanup_pop(false);
+
+ if (!IS_EMPTY(rb)) {
+ idx = TAIL(rb);
+ ADVANCE_TAIL(rb);
+ pthread_cond_broadcast(rb->del);
+ } else if (idx != -ETIMEDOUT) {
+ idx = check_rb_acl(rb);
+ }
+
+ pthread_mutex_unlock(rb->mtx);
+
+ assert(idx != -EAGAIN);
+
+ return idx;
+}
+
+void ssm_rbuff_set_acl(struct ssm_rbuff * rb,
+ uint32_t flags)
+{
+ assert(rb != NULL);
+
+ __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST);
+}
+
+uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb)
+{
+ assert(rb != NULL);
+
+ return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+}
+
+void ssm_rbuff_fini(struct ssm_rbuff * rb)
+{
+ assert(rb != NULL);
+
+ robust_mutex_lock(rb->mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
+
+ while (!IS_EMPTY(rb))
+ robust_wait(rb->del, rb->mtx, NULL);
+
+ pthread_cleanup_pop(true);
+}
+
+size_t ssm_rbuff_queued(struct ssm_rbuff * rb)
+{
+ assert(rb != NULL);
+
+ return QUEUED(rb);
+}
+
+int ssm_rbuff_mlock(struct ssm_rbuff * rb)
+{
+ assert(rb != NULL);
+
+ return mlock(rb->shm_base, SSM_RBUFF_FILESIZE);
+}
diff --git a/src/lib/ssm/ssm.h.in b/src/lib/ssm/ssm.h.in
new file mode 100644
index 00000000..d14cd49c
--- /dev/null
+++ b/src/lib/ssm/ssm.h.in
@@ -0,0 +1,146 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2026
+ *
+ * Secure Shared Memory configuration
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_LIB_SSM_H
+#define OUROBOROS_LIB_SSM_H
+
+#include <stddef.h>
+#include <stdint.h>
+#include <stdatomic.h>
+#include <sys/types.h>
+
+/* Pool naming configuration */
+#define SSM_PREFIX "@SSM_PREFIX@"
+#define SSM_GSMP_SUFFIX "@SSM_GSMP_SUFFIX@"
+#define SSM_PPP_SUFFIX "@SSM_PPP_SUFFIX@"
+
+/* Legacy SSM constants */
+#define SSM_RBUFF_PREFIX "@SSM_RBUFF_PREFIX@"
+#define SSM_FLOW_SET_PREFIX "@SSM_FLOW_SET_PREFIX@"
+#define SSM_POOL_NAME "@SSM_POOL_NAME@"
+#define SSM_POOL_BLOCKS @SSM_POOL_BLOCKS@
+#define SSM_RBUFF_SIZE @SSM_RBUFF_SIZE@
+
+/* Packet buffer space reservation */
+#define SSM_PK_BUFF_HEADSPACE @SSM_PK_BUFF_HEADSPACE@
+#define SSM_PK_BUFF_TAILSPACE @SSM_PK_BUFF_TAILSPACE@
+
+/* Pool blocks per size class */
+#define SSM_POOL_256_BLOCKS @SSM_POOL_256_BLOCKS@
+#define SSM_POOL_512_BLOCKS @SSM_POOL_512_BLOCKS@
+#define SSM_POOL_1K_BLOCKS @SSM_POOL_1K_BLOCKS@
+#define SSM_POOL_2K_BLOCKS @SSM_POOL_2K_BLOCKS@
+#define SSM_POOL_4K_BLOCKS @SSM_POOL_4K_BLOCKS@
+#define SSM_POOL_16K_BLOCKS @SSM_POOL_16K_BLOCKS@
+#define SSM_POOL_64K_BLOCKS @SSM_POOL_64K_BLOCKS@
+#define SSM_POOL_256K_BLOCKS @SSM_POOL_256K_BLOCKS@
+#define SSM_POOL_1M_BLOCKS @SSM_POOL_1M_BLOCKS@
+#define SSM_POOL_TOTAL_SIZE @SSM_POOL_TOTAL_SIZE@
+
+/* Size class configuration */
+#define SSM_POOL_MAX_CLASSES 9
+#define SSM_POOL_SHARDS @SSM_POOL_SHARDS@
+
+/* Internal structures - exposed for testing */
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <errno.h>
+#include <pthread.h>
+
+#include <ouroboros/pthread.h>
+
+static __inline__ void robust_mutex_lock(pthread_mutex_t * mtx)
+{
+#ifndef HAVE_ROBUST_MUTEX
+ pthread_mutex_lock(mtx);
+#else
+ if (pthread_mutex_lock(mtx) == EOWNERDEAD)
+ pthread_mutex_consistent(mtx);
+#endif
+}
+
+static __inline__ int robust_wait(pthread_cond_t * cond,
+ pthread_mutex_t * mtx,
+ const struct timespec * abstime)
+{
+ int ret = __timedwait(cond, mtx, abstime);
+#ifdef HAVE_ROBUST_MUTEX
+ if (ret == EOWNERDEAD)
+ pthread_mutex_consistent(mtx);
+#endif
+ return ret;
+}
+
+/* Packet buffer structure used by pool, rbuff, and tests */
+struct ssm_pk_buff {
+ uint32_t next_offset; /* List linkage (pool < 4GB) */
+ uint16_t refcount; /* Reference count (app + rtx) */
+ pid_t allocator_pid; /* For orphan detection */
+ uint32_t size; /* Block size (max 1MB) */
+ uint32_t pk_head; /* Head offset into data */
+ uint32_t pk_tail; /* Tail offset into data */
+ uint32_t off; /* Block offset in pool */
+ uint8_t data[]; /* Packet data */
+};
+
+/* Size class configuration table */
+struct ssm_size_class_cfg {
+ size_t size;
+ size_t blocks;
+};
+
+struct _ssm_list_head {
+ uint32_t head_offset;
+ uint32_t count;
+};
+
+struct _ssm_shard {
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+ struct _ssm_list_head free_list;
+ size_t free_count;
+};
+
+struct _ssm_size_class {
+ struct _ssm_shard shards[SSM_POOL_SHARDS];
+ size_t object_size;
+ size_t pool_start;
+ size_t pool_size;
+ size_t object_count;
+};
+
+struct _ssm_pool_hdr {
+ pthread_mutex_t mtx;
+ pthread_cond_t healthy;
+ pid_t pid;
+ uint32_t initialized;
+ void * mapped_addr;
+ struct _ssm_size_class size_classes[SSM_POOL_MAX_CLASSES];
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* OUROBOROS_LIB_SSM_H */
diff --git a/src/lib/ssm/tests/CMakeLists.txt b/src/lib/ssm/tests/CMakeLists.txt
new file mode 100644
index 00000000..827f8bf8
--- /dev/null
+++ b/src/lib/ssm/tests/CMakeLists.txt
@@ -0,0 +1,33 @@
+get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
+get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
+
+compute_test_prefix()
+
+create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
+ # Add new tests here
+ pool_test.c
+ pool_sharding_test.c
+ rbuff_test.c
+ flow_set_test.c
+ )
+
+add_executable(${PARENT_DIR}_test ${${PARENT_DIR}_tests})
+
+disable_test_logging_for_target(${PARENT_DIR}_test)
+target_link_libraries(${PARENT_DIR}_test ouroboros-common)
+
+add_dependencies(build_tests ${PARENT_DIR}_test)
+
+set(tests_to_run ${${PARENT_DIR}_tests})
+if(CMAKE_VERSION VERSION_LESS "3.29.0")
+ remove(tests_to_run test_suite.c)
+else ()
+ list(POP_FRONT tests_to_run)
+endif()
+
+foreach (test ${tests_to_run})
+ get_filename_component(test_name ${test} NAME_WE)
+ add_test(${TEST_PREFIX}/${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name})
+ set_property(TEST ${TEST_PREFIX}/${test_name}
+ PROPERTY ENVIRONMENT "OUROBOROS_TEST_POOL_SUFFIX=.test")
+endforeach (test)
diff --git a/src/lib/ssm/tests/flow_set_test.c b/src/lib/ssm/tests/flow_set_test.c
new file mode 100644
index 00000000..f9084d3c
--- /dev/null
+++ b/src/lib/ssm/tests/flow_set_test.c
@@ -0,0 +1,255 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Test of the SSM flow set
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+#include "ssm.h"
+
+#include <test/test.h>
+#include <ouroboros/ssm_flow_set.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/time.h>
+
+#include <stdio.h>
+#include <unistd.h>
+#include <pthread.h>
+
+static int test_ssm_flow_set_create_destroy(void)
+{
+ struct ssm_flow_set * set;
+ pid_t pid;
+
+ TEST_START();
+
+ pid = getpid();
+
+ set = ssm_flow_set_create(pid);
+ if (set == NULL) {
+ printf("Failed to create flow set.\n");
+ goto fail;
+ }
+
+ ssm_flow_set_destroy(set);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_flow_set_add_del_has(void)
+{
+ struct ssm_flow_set * set;
+ pid_t pid;
+ size_t idx = 0;
+ int flow_id = 42;
+
+ TEST_START();
+
+ pid = getpid();
+
+ set = ssm_flow_set_create(pid);
+ if (set == NULL) {
+ printf("Failed to create flow set.\n");
+ goto fail;
+ }
+
+ if (ssm_flow_set_has(set, idx, flow_id)) {
+ printf("Flow should not be in set initially.\n");
+ goto fail_destroy;
+ }
+
+ if (ssm_flow_set_add(set, idx, flow_id) < 0) {
+ printf("Failed to add flow to set.\n");
+ goto fail_destroy;
+ }
+
+ if (!ssm_flow_set_has(set, idx, flow_id)) {
+ printf("Flow should be in set after add.\n");
+ goto fail_destroy;
+ }
+
+ /* Adding same flow again should fail */
+ if (ssm_flow_set_add(set, idx, flow_id) != -EPERM) {
+ printf("Should not be able to add flow twice.\n");
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_del(set, idx, flow_id);
+
+ if (ssm_flow_set_has(set, idx, flow_id)) {
+ printf("Flow should not be in set after delete.\n");
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_destroy(set);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+fail_destroy:
+ ssm_flow_set_destroy(set);
+fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_flow_set_zero(void)
+{
+ struct ssm_flow_set * set;
+ pid_t pid;
+ size_t idx = 0;
+ int flow_id1 = 10;
+ int flow_id2 = 20;
+
+ TEST_START();
+
+ pid = getpid();
+
+ set = ssm_flow_set_create(pid);
+ if (set == NULL) {
+ printf("Failed to create flow set.\n");
+ goto fail;
+ }
+
+ if (ssm_flow_set_add(set, idx, flow_id1) < 0) {
+ printf("Failed to add flow1 to set.\n");
+ goto fail_destroy;
+ }
+
+ if (ssm_flow_set_add(set, idx, flow_id2) < 0) {
+ printf("Failed to add flow2 to set.\n");
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_zero(set, idx);
+
+ if (ssm_flow_set_has(set, idx, flow_id1)) {
+ printf("Flow1 should not be in set after zero.\n");
+ goto fail_destroy;
+ }
+
+ if (ssm_flow_set_has(set, idx, flow_id2)) {
+ printf("Flow2 should not be in set after zero.\n");
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_destroy(set);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+fail_destroy:
+ ssm_flow_set_destroy(set);
+fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_flow_set_notify_wait(void)
+{
+ struct ssm_flow_set * set;
+ pid_t pid;
+ size_t idx = 0;
+ int flow_id = 100;
+ struct flowevent events[SSM_RBUFF_SIZE];
+ struct timespec timeout;
+ ssize_t ret;
+
+ TEST_START();
+
+ pid = getpid();
+
+ set = ssm_flow_set_create(pid);
+ if (set == NULL) {
+ printf("Failed to create flow set.\n");
+ goto fail;
+ }
+
+ if (ssm_flow_set_add(set, idx, flow_id) < 0) {
+ printf("Failed to add flow to set.\n");
+ goto fail_destroy;
+ }
+
+ /* Test immediate timeout when no events */
+ clock_gettime(PTHREAD_COND_CLOCK, &timeout);
+ ret = ssm_flow_set_wait(set, idx, events, &timeout);
+ if (ret != -ETIMEDOUT) {
+ printf("Wait should timeout immediately when no events.\n");
+ goto fail_destroy;
+ }
+
+ /* Notify an event */
+ ssm_flow_set_notify(set, flow_id, FLOW_PKT);
+
+ /* Should be able to read the event immediately */
+ clock_gettime(PTHREAD_COND_CLOCK, &timeout);
+ ts_add(&timeout, &timeout, &((struct timespec) {1, 0}));
+
+ ret = ssm_flow_set_wait(set, idx, events, &timeout);
+ if (ret != 1) {
+ printf("Wait should return 1 event, got %zd.\n", ret);
+ goto fail_destroy;
+ }
+
+ if (events[0].flow_id != flow_id) {
+ printf("Event flow_id mismatch: expected %d, got %d.\n",
+ flow_id, events[0].flow_id);
+ goto fail_destroy;
+ }
+
+ if (events[0].event != FLOW_PKT) {
+ printf("Event type mismatch: expected %d, got %d.\n",
+ FLOW_PKT, events[0].event);
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_destroy(set);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+fail_destroy:
+ ssm_flow_set_destroy(set);
+fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int flow_set_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ret |= test_ssm_flow_set_create_destroy();
+ ret |= test_ssm_flow_set_add_del_has();
+ ret |= test_ssm_flow_set_zero();
+ ret |= test_ssm_flow_set_notify_wait();
+
+ return ret;
+}
diff --git a/src/lib/ssm/tests/pool_sharding_test.c b/src/lib/ssm/tests/pool_sharding_test.c
new file mode 100644
index 00000000..72ae1cb7
--- /dev/null
+++ b/src/lib/ssm/tests/pool_sharding_test.c
@@ -0,0 +1,505 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2026
+ *
+ * Test of the SSM pool sharding with fallback
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+#include "ssm.h"
+
+#include <test/test.h>
+#include <ouroboros/ssm_pool.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <signal.h>
+
+#define TEST_SIZE 256
+
+/* Helper to get pool header for inspection */
+static struct _ssm_pool_hdr * get_pool_hdr(struct ssm_pool * pool)
+{
+ /* ssm_pool is opaque, but we know its layout:
+ * uint8_t * shm_base
+ * struct _ssm_pool_hdr * hdr
+ * void * pool_base
+ */
+ struct _ssm_pool_hdr ** hdr_ptr =
+ (struct _ssm_pool_hdr **)((uint8_t *)pool + sizeof(void *));
+ return *hdr_ptr;
+}
+
+static int test_lazy_distribution(void)
+{
+ struct ssm_pool * pool;
+ struct _ssm_pool_hdr * hdr;
+ struct _ssm_size_class * sc;
+ int i;
+ int sc_idx;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ hdr = get_pool_hdr(pool);
+ if (hdr == NULL) {
+ printf("Failed to get pool header.\n");
+ goto fail_pool;
+ }
+
+ /* Find the first size class with blocks */
+ sc_idx = -1;
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ if (hdr->size_classes[i].object_count > 0) {
+ sc_idx = i;
+ break;
+ }
+ }
+
+ if (sc_idx < 0) {
+ printf("No size classes configured.\n");
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ printf(" Class %d: count=%zu\n", i,
+ hdr->size_classes[i].object_count);
+ }
+ goto fail_pool;
+ }
+
+ sc = &hdr->size_classes[sc_idx];
+
+ /* Verify all blocks start in shard 0 */
+ if (sc->shards[0].free_count == 0) {
+ printf("Shard 0 should have all blocks initially.\n");
+ goto fail_pool;
+ }
+
+ /* Verify other shards are empty */
+ for (i = 1; i < SSM_POOL_SHARDS; i++) {
+ if (sc->shards[i].free_count != 0) {
+ printf("Shard %d should be empty, has %zu.\n",
+ i, sc->shards[i].free_count);
+ goto fail_pool;
+ }
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_shard_migration(void)
+{
+ struct ssm_pool * pool;
+ struct _ssm_pool_hdr * hdr;
+ struct _ssm_size_class * sc;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t off;
+ int shard_idx;
+ int sc_idx;
+ int i;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ hdr = get_pool_hdr(pool);
+
+ /* Find the first size class with blocks */
+ sc_idx = -1;
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ if (hdr->size_classes[i].object_count > 0) {
+ sc_idx = i;
+ break;
+ }
+ }
+
+ if (sc_idx < 0) {
+ printf("No size classes configured.\n");
+ goto fail;
+ }
+
+ sc = &hdr->size_classes[sc_idx];
+
+ /* Allocate from this process */
+ off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb);
+ if (off < 0) {
+ printf("Allocation failed: %zd.\n", off);
+ goto fail_pool;
+ }
+
+ /* Free it - should go to this process's shard */
+ shard_idx = getpid() % SSM_POOL_SHARDS;
+ if (ssm_pool_remove(pool, off) != 0) {
+ printf("Remove failed.\n");
+ goto fail_pool;
+ }
+
+ /* Verify block migrated away from shard 0 or in allocator's shard */
+ if (sc->shards[shard_idx].free_count == 0 &&
+ sc->shards[0].free_count == 0) {
+ printf("Block should have been freed to a shard.\n");
+ goto fail_pool;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_fallback_stealing(void)
+{
+ struct ssm_pool * pool;
+ struct _ssm_pool_hdr * hdr;
+ struct _ssm_size_class * sc;
+ struct ssm_pk_buff ** spbs;
+ uint8_t ** ptrs;
+ size_t total_blocks;
+ size_t total_free;
+ size_t i;
+ int sc_idx;
+ int c;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ hdr = get_pool_hdr(pool);
+
+ /* Find the first size class with blocks */
+ sc_idx = -1;
+ for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) {
+ if (hdr->size_classes[c].object_count > 0) {
+ sc_idx = c;
+ break;
+ }
+ }
+
+ if (sc_idx < 0) {
+ printf("No size classes configured.\n");
+ goto fail;
+ }
+
+ sc = &hdr->size_classes[sc_idx];
+ total_blocks = sc->object_count;
+
+ spbs = malloc(total_blocks * sizeof(struct ssm_pk_buff *));
+ ptrs = malloc(total_blocks * sizeof(uint8_t *));
+ if (spbs == NULL || ptrs == NULL) {
+ printf("Failed to allocate test arrays.\n");
+ goto fail_pool;
+ }
+
+ /* Allocate half the blocks from single process */
+ for (i = 0; i < total_blocks / 2; i++) {
+ ssize_t off = ssm_pool_alloc(pool, TEST_SIZE,
+ &ptrs[i], &spbs[i]);
+ if (off < 0) {
+ printf("Allocation %zu failed: %zd.\n", i, off);
+ free(spbs);
+ free(ptrs);
+ goto fail_pool;
+ }
+ }
+
+ /* Free them all - they go to local_shard */
+ for (i = 0; i < total_blocks / 2; i++) {
+ size_t off = ssm_pk_buff_get_idx(spbs[i]);
+ if (ssm_pool_remove(pool, off) != 0) {
+ printf("Remove %zu failed.\n", i);
+ free(spbs);
+ free(ptrs);
+ goto fail_pool;
+ }
+ }
+
+ /* Freed blocks should be in shards (all blocks free again) */
+ total_free = 0;
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ total_free += sc->shards[i].free_count;
+ }
+
+ if (total_free != total_blocks) {
+ printf("Expected %zu free blocks total, got %zu.\n",
+ total_blocks, total_free);
+ free(spbs);
+ free(ptrs);
+ goto fail_pool;
+ }
+
+ /* Allocate again - should succeed by taking from shards */
+ for (i = 0; i < total_blocks / 2; i++) {
+ ssize_t off = ssm_pool_alloc(pool, TEST_SIZE,
+ &ptrs[i], &spbs[i]);
+ if (off < 0) {
+ printf("Fallback alloc %zu failed: %zd.\n", i, off);
+ free(spbs);
+ free(ptrs);
+ goto fail_pool;
+ }
+ }
+
+ /* Now all allocated blocks are in use again */
+ /* Cleanup - free all allocated blocks */
+ for (i = 0; i < total_blocks / 2; i++) {
+ size_t off = ssm_pk_buff_get_idx(spbs[i]);
+ ssm_pool_remove(pool, off);
+ }
+
+ free(spbs);
+ free(ptrs);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_multiprocess_sharding(void)
+{
+ struct ssm_pool * pool;
+ struct _ssm_pool_hdr * hdr;
+ struct _ssm_size_class * sc;
+ pid_t children[SSM_POOL_SHARDS];
+ int i;
+ int status;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ /* Fork processes to test different shards */
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ children[i] = fork();
+ if (children[i] == -1) {
+ printf("Fork %d failed.\n", i);
+ goto fail_children;
+ }
+
+ if (children[i] == 0) {
+ /* Child process */
+ struct ssm_pool * child_pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t off;
+ int my_shard;
+
+ child_pool = ssm_pool_open();
+ if (child_pool == NULL)
+ exit(EXIT_FAILURE);
+
+ my_shard = getpid() % SSM_POOL_SHARDS;
+ (void) my_shard; /* Reserved for future use */
+
+ /* Each child allocates and frees a block */
+ off = ssm_pool_alloc(child_pool, TEST_SIZE,
+ &ptr, &spb);
+ if (off < 0) {
+ ssm_pool_close(child_pool);
+ exit(EXIT_FAILURE);
+ }
+
+ /* Small delay to ensure allocation visible */
+ usleep(10000);
+
+ if (ssm_pool_remove(child_pool, off) != 0) {
+ ssm_pool_close(child_pool);
+ exit(EXIT_FAILURE);
+ }
+
+ ssm_pool_close(child_pool);
+ exit(EXIT_SUCCESS);
+ }
+ }
+
+ /* Wait for all children */
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ if (waitpid(children[i], &status, 0) == -1) {
+ printf("Waitpid %d failed.\n", i);
+ goto fail_children;
+ }
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ printf("Child %d failed.\n", i);
+ goto fail_pool;
+ }
+ }
+
+ /* Verify blocks distributed across shards */
+ hdr = get_pool_hdr(pool);
+
+ /* Find the first size class with blocks */
+ sc = NULL;
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ if (hdr->size_classes[i].object_count > 0) {
+ sc = &hdr->size_classes[i];
+ break;
+ }
+ }
+
+ if (sc == NULL) {
+ printf("No size classes configured.\n");
+ goto fail_pool;
+ }
+
+ /* After children allocate and free, blocks should be in shards
+ * (though exact distribution depends on PID values)
+ */
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ /* At least some shards should have blocks */
+ if (sc->shards[i].free_count > 0) {
+ break;
+ }
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_children:
+ /* Kill any remaining children */
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ if (children[i] > 0)
+ kill(children[i], SIGKILL);
+ }
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_exhaustion_with_fallback(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t off;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ /* Allocate until exhausted across all shards */
+ while (true) {
+ off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb);
+ if (off < 0) {
+ if (off == -EAGAIN)
+ break;
+ printf("Unexpected error: %zd.\n", off);
+ goto fail_pool;
+ }
+ }
+
+ /* Should fail with -EAGAIN when truly exhausted */
+ off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb);
+ if (off != -EAGAIN) {
+ printf("Expected -EAGAIN, got %zd.\n", off);
+ goto fail_pool;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int pool_sharding_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ret |= test_lazy_distribution();
+ ret |= test_shard_migration();
+ ret |= test_fallback_stealing();
+ ret |= test_multiprocess_sharding();
+ ret |= test_exhaustion_with_fallback();
+
+ return ret;
+}
diff --git a/src/lib/ssm/tests/pool_test.c b/src/lib/ssm/tests/pool_test.c
new file mode 100644
index 00000000..e298d9ab
--- /dev/null
+++ b/src/lib/ssm/tests/pool_test.c
@@ -0,0 +1,1038 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Test of the Secure Shared Memory (SSM) system
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+#include "ssm.h"
+
+#include <test/test.h>
+#include <ouroboros/ssm_pool.h>
+#include <ouroboros/ssm_rbuff.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <stdatomic.h>
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <signal.h>
+#include <time.h>
+
+#define POOL_256 256
+#define POOL_512 512
+#define POOL_1K 1024
+#define POOL_2K 2048
+#define POOL_4K 4096
+#define POOL_16K 16384
+#define POOL_64K 65536
+#define POOL_256K 262144
+#define POOL_1M 1048576
+#define POOL_2M (2 * 1024 * 1024)
+
+static int test_ssm_pool_basic_allocation(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Alloc failed: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ if (spb == NULL) {
+ printf("Spb is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ptr == NULL) {
+ printf("Ptr is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb) != POOL_256) {
+ printf("Bad length: %zu.\n", ssm_pk_buff_len(spb));
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_remove(pool, ret);
+ if (ret != 0) {
+ printf("Remove failed: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_multiple_allocations(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr1;
+ uint8_t * ptr2;
+ uint8_t * ptr3;
+ struct ssm_pk_buff * spb1;
+ struct ssm_pk_buff * spb2;
+ struct ssm_pk_buff * spb3;
+ ssize_t ret1;
+ ssize_t ret2;
+ ssize_t ret3;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1);
+ ret2 = ssm_pool_alloc(pool, POOL_256, &ptr2, &spb2);
+ ret3 = ssm_pool_alloc(pool, POOL_256, &ptr3, &spb3);
+ if (ret1 < 0 || ret2 < 0 || ret3 < 0) {
+ printf("Allocs failed: %zd, %zd, %zd.\n", ret1, ret2, ret3);
+ goto fail_alloc;
+ }
+
+ if (spb1 == NULL) {
+ printf("Spb1 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ptr1 == NULL) {
+ printf("Ptr1 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (spb2 == NULL) {
+ printf("Spb2 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ptr2 == NULL) {
+ printf("Ptr2 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (spb3 == NULL) {
+ printf("Spb3 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ptr3 == NULL) {
+ printf("Ptr3 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb1) != POOL_256) {
+ printf("Bad length spb1: %zu.\n", ssm_pk_buff_len(spb1));
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb2) != POOL_256) {
+ printf("Bad length spb2: %zu.\n", ssm_pk_buff_len(spb2));
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb3) != POOL_256) {
+ printf("Bad length spb3: %zu.\n", ssm_pk_buff_len(spb3));
+ goto fail_alloc;
+ }
+
+ if (ssm_pool_remove(pool, ret2) != 0) {
+ printf("Remove ret2 failed.\n");
+ goto fail_alloc;
+ }
+
+ if (ssm_pool_remove(pool, ret1) != 0) {
+ printf("Remove ret1 failed.\n");
+ goto fail_alloc;
+ }
+
+ if (ssm_pool_remove(pool, ret3) != 0) {
+ printf("Remove ret3 failed.\n");
+ goto fail_alloc;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_no_fallback_for_large(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(pool, POOL_2M, &ptr, &spb);
+ if (ret >= 0) {
+ printf("Oversized alloc succeeded: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ if (ret != -EMSGSIZE) {
+ printf("Wrong error: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_blocking_vs_nonblocking(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(pool, POOL_2M, &ptr, &spb);
+ if (ret != -EMSGSIZE) {
+ printf("Nonblocking oversized: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_alloc_b(pool, POOL_2M, &ptr, &spb, NULL);
+ if (ret != -EMSGSIZE) {
+ printf("Blocking oversized: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Valid alloc failed: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ssm_pool_remove(pool, ret);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_stress_test(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t * indices = NULL;
+ ssize_t ret;
+ size_t count = 0;
+ size_t i;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ indices = malloc(100 * sizeof(*indices));
+ if (indices == NULL) {
+ printf("Malloc failed.\n");
+ goto fail_alloc;
+ }
+
+ for (i = 0; i < 100; i++) {
+ size_t j;
+ size_t num;
+ size_t size;
+
+ num = (i % 100) + 1;
+
+ for (j = 0; j < num && count < 100; j++) {
+ switch (i % 4) {
+ case 0:
+ /* FALLTHRU */
+ case 1:
+ size = POOL_256;
+ break;
+ case 2:
+ /* FALLTHRU */
+ case 3:
+ size = POOL_1K;
+ break;
+ default:
+ size = POOL_256;
+ break;
+ }
+
+ ret = ssm_pool_alloc(pool, size, &ptr, &spb);
+ if (ret < 0) {
+ printf("Alloc at iter %zu: %zd.\n", i, ret);
+ goto fail_test;
+ }
+ indices[count++] = ret;
+ }
+
+ for (j = 0; j < count / 2; j++) {
+ size_t idx = j * 2;
+ if (idx < count) {
+ ret = ssm_pool_remove(pool, indices[idx]);
+ if (ret != 0) {
+ printf("Remove at iter %zu: %zd.\n",
+ i, ret);
+ goto fail_test;
+ }
+ memmove(&indices[idx], &indices[idx + 1],
+ (count - idx - 1) * sizeof(*indices));
+ count--;
+ }
+ }
+
+ if (i % 10 == 0) {
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Periodic alloc at %zu: %zd.\n", i, ret);
+ goto fail_test;
+ }
+ ssm_pool_remove(pool, ret);
+ }
+ }
+
+ for (i = 0; i < count; i++)
+ ssm_pool_remove(pool, indices[i]);
+
+ free(indices);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_test:
+ for (i = 0; i < count; i++)
+ ssm_pool_remove(pool, indices[i]);
+ free(indices);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_open_initializes_ssm(void)
+{
+ struct ssm_pool * creator;
+ struct ssm_pool * opener;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ creator = ssm_pool_create();
+ if (creator == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(creator, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Creator alloc failed: %zd.\n", ret);
+ goto fail_creator;
+ }
+ ssm_pool_remove(creator, ret);
+
+ opener = ssm_pool_open();
+ if (opener == NULL) {
+ printf("Open failed.\n");
+ goto fail_creator;
+ }
+
+ ret = ssm_pool_alloc(opener, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Opener alloc failed: %zd.\n", ret);
+ goto fail_opener;
+ }
+
+ ssm_pool_remove(opener, ret);
+ ssm_pool_close(opener);
+ ssm_pool_destroy(creator);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_opener:
+ ssm_pool_close(opener);
+ fail_creator:
+ ssm_pool_destroy(creator);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_bounds_checking(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(pool, POOL_256, NULL, &spb);
+ if (ret < 0) {
+ printf("alloc failed: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ spb = ssm_pool_get(pool, 0);
+ if (spb != NULL) {
+ printf("Get at offset 0.\n");
+ goto fail_alloc;
+ }
+
+ spb = ssm_pool_get(pool, 100000000UL);
+ if (spb != NULL) {
+ printf("Get beyond pool.\n");
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_remove(pool, 0);
+ if (ret != -EINVAL) {
+ printf("Remove at offset 0: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_remove(pool, 100000000UL);
+ if (ret != -EINVAL) {
+ printf("Remove beyond pool: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_inter_process_communication(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_rbuff * rb;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ uint8_t * data;
+ const char * msg = "inter-process test";
+ size_t len;
+ ssize_t idx;
+ pid_t pid;
+ int status;
+
+ TEST_START();
+
+ len = strlen(msg) + 1;
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ rb = ssm_rbuff_create(getpid(), 1);
+ if (rb == NULL) {
+ printf("Rbuff create failed.\n");
+ goto fail_pool;
+ }
+
+ pid = fork();
+ if (pid < 0) {
+ printf("Fork failed.\n");
+ goto fail_rbuff;
+ }
+
+ if (pid == 0) {
+ idx = ssm_rbuff_read_b(rb, NULL);
+ if (idx < 0) {
+ printf("Child: rbuff read: %zd.\n", idx);
+ exit(1);
+ }
+
+ spb = ssm_pool_get(pool, idx);
+ if (spb == NULL) {
+ printf("Child: pool get failed.\n");
+ exit(1);
+ }
+
+ data = ssm_pk_buff_head(spb);
+ if (data == NULL) {
+ printf("Child: data is NULL.\n");
+ ssm_pool_remove(pool, idx);
+ exit(1);
+ }
+
+ if (strcmp((char *)data, msg) != 0) {
+ printf("Child: data mismatch.\n");
+ ssm_pool_remove(pool, idx);
+ exit(1);
+ }
+
+ ssm_pool_remove(pool, idx);
+ exit(0);
+ }
+
+ idx = ssm_pool_alloc(pool, len, &ptr, &spb);
+ if (idx < 0) {
+ printf("Parent: pool alloc: %zd.\n", idx);
+ goto fail_child;
+ }
+
+ memcpy(ptr, msg, len);
+
+ if (ssm_rbuff_write(rb, idx) < 0) {
+ printf("Parent: rbuff write failed.\n");
+ ssm_pool_remove(pool, idx);
+ goto fail_child;
+ }
+
+ if (waitpid(pid, &status, 0) < 0) {
+ printf("Parent: waitpid failed.\n");
+ ssm_pool_remove(pool, idx);
+ goto fail_rbuff;
+ }
+
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ printf("Child failed.\n");
+ ssm_pool_remove(pool, idx);
+ goto fail_rbuff;
+ }
+
+ ssm_rbuff_destroy(rb);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_child:
+ waitpid(pid, &status, 0);
+ fail_rbuff:
+ ssm_rbuff_destroy(rb);
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_read_operation(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * wptr;
+ uint8_t * rptr;
+ const char * data = "ssm_pool_read test";
+ size_t len;
+ ssize_t idx;
+ ssize_t ret;
+
+ TEST_START();
+
+ len = strlen(data) + 1;
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ idx = ssm_pool_alloc(pool, len, &wptr, &spb);
+ if (idx < 0) {
+ printf("alloc failed: %zd.\n", idx);
+ goto fail_alloc;
+ }
+
+ memcpy(wptr, data, len);
+
+ ret = ssm_pool_read(&rptr, pool, idx);
+ if (ret < 0) {
+ printf("Read failed: %zd.\n", ret);
+ goto fail_read;
+ }
+
+ if (rptr == NULL) {
+ printf("NULL pointer.\n");
+ goto fail_read;
+ }
+
+ if (strcmp((char *)rptr, data) != 0) {
+ printf("Data mismatch.\n");
+ goto fail_read;
+ }
+
+ ssm_pool_remove(pool, idx);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_read:
+ ssm_pool_remove(pool, idx);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_mlock_operation(void)
+{
+ struct ssm_pool * pool;
+ int ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_mlock(pool);
+ if (ret < 0)
+ printf("Mlock failed: %d (may need privileges).\n", ret);
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pk_buff_operations(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ uint8_t * head;
+ uint8_t * tail;
+ const char * data = "packet buffer test";
+ size_t dlen;
+ size_t len;
+ ssize_t idx;
+
+ TEST_START();
+
+ dlen = strlen(data);
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ idx = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (idx < 0) {
+ printf("alloc failed: %zd.\n", idx);
+ goto fail_alloc;
+ }
+
+ head = ssm_pk_buff_head(spb);
+ if (head != ptr) {
+ printf("Head mismatch.\n");
+ goto fail_ops;
+ }
+
+ len = ssm_pk_buff_len(spb);
+ if (len != POOL_256) {
+ printf("Bad length: %zu.\n", len);
+ goto fail_ops;
+ }
+
+ tail = ssm_pk_buff_tail(spb);
+ if (tail != ptr + len) {
+ printf("Tail mismatch.\n");
+ goto fail_ops;
+ }
+
+ memcpy(head, data, dlen);
+
+ tail = ssm_pk_buff_tail_alloc(spb, 32);
+ if (tail == NULL) {
+ printf("Tail_alloc failed.\n");
+ goto fail_ops;
+ }
+
+ if (ssm_pk_buff_len(spb) != POOL_256 + 32) {
+ printf("Length after tail_alloc: %zu.\n",
+ ssm_pk_buff_len(spb));
+ goto fail_ops;
+ }
+
+ if (memcmp(head, data, dlen) != 0) {
+ printf("Data corrupted.\n");
+ goto fail_ops;
+ }
+
+ tail = ssm_pk_buff_tail_release(spb, 32);
+ if (tail == NULL) {
+ printf("Tail_release failed.\n");
+ goto fail_ops;
+ }
+
+ if (ssm_pk_buff_len(spb) != POOL_256) {
+ printf("Length after tail_release: %zu.\n",
+ ssm_pk_buff_len(spb));
+ goto fail_ops;
+ }
+
+ ssm_pool_remove(pool, idx);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_ops:
+ ssm_pool_remove(pool, idx);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+#define OVERHEAD (offsetof(struct ssm_pk_buff, data) + \
+ SSM_PK_BUFF_HEADSPACE + SSM_PK_BUFF_TAILSPACE)
+static int test_ssm_pool_size_class_boundaries(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ size_t sizes[] = {
+ 1,
+ POOL_512 - OVERHEAD,
+ POOL_512 - OVERHEAD + 1,
+ POOL_1K - OVERHEAD,
+ POOL_1K - OVERHEAD + 1,
+ POOL_2K - OVERHEAD,
+ POOL_2K - OVERHEAD + 1,
+ POOL_4K - OVERHEAD,
+ POOL_4K - OVERHEAD + 1,
+ POOL_16K - OVERHEAD,
+ POOL_16K - OVERHEAD + 1,
+ POOL_64K - OVERHEAD,
+ POOL_64K - OVERHEAD + 1,
+ POOL_256K - OVERHEAD,
+ POOL_256K - OVERHEAD + 1,
+ POOL_1M - OVERHEAD,
+ };
+ size_t expected_classes[] = {
+ 512, 512, 1024, 1024, 2048, 2048, 4096, 4096, 16384,
+ 16384, 65536, 65536, 262144, 262144, 1048576, 1048576
+ };
+ size_t i;
+ ssize_t idx;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ for (i = 0; i < sizeof(sizes) / sizeof(sizes[0]); i++) {
+ struct ssm_pk_buff * hdr;
+ size_t actual_class;
+
+ idx = ssm_pool_alloc(pool, sizes[i], &ptr, &spb);
+ if (idx < 0) {
+ printf("Alloc at %zu failed: %zd.\n", sizes[i], idx);
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb) != sizes[i]) {
+ printf("Length mismatch at %zu: %zu.\n",
+ sizes[i], ssm_pk_buff_len(spb));
+ ssm_pool_remove(pool, idx);
+ goto fail_alloc;
+ }
+
+ /* Verify correct size class was used
+ * hdr->size is the data array size (object_size - header) */
+ hdr = spb;
+ actual_class = hdr->size + offsetof(struct ssm_pk_buff, data);
+ if (actual_class != expected_classes[i]) {
+ printf("Wrong class for len=%zu: want %zu, got %zu.\n",
+ sizes[i], expected_classes[i], actual_class);
+ ssm_pool_remove(pool, idx);
+ goto fail_alloc;
+ }
+
+ memset(ptr, i & 0xFF, sizes[i]);
+
+ ssm_pool_remove(pool, idx);
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_exhaustion(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t * indices;
+ size_t count = 0;
+ size_t i;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ indices = malloc(2048 * sizeof(*indices));
+ if (indices == NULL) {
+ printf("Malloc failed.\n");
+ goto fail_alloc;
+ }
+
+ for (i = 0; i < 2048; i++) {
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ if (ret == -EAGAIN)
+ break;
+ printf("Alloc error: %zd.\n", ret);
+ goto fail_test;
+ }
+ indices[count++] = ret;
+ }
+
+ if (count == 0) {
+ printf("No allocs succeeded.\n");
+ goto fail_test;
+ }
+
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret >= 0) {
+ ssm_pool_remove(pool, ret);
+ } else if (ret != -EAGAIN) {
+ printf("Unexpected error: %zd.\n", ret);
+ goto fail_test;
+ }
+
+ for (i = 0; i < count; i++)
+ ssm_pool_remove(pool, indices[i]);
+
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Alloc after free failed: %zd.\n", ret);
+ goto fail_test;
+ }
+ ssm_pool_remove(pool, ret);
+
+ free(indices);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_test:
+ for (i = 0; i < count; i++)
+ ssm_pool_remove(pool, indices[i]);
+ free(indices);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_reclaim_orphans(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr1;
+ uint8_t * ptr2;
+ uint8_t * ptr3;
+ struct ssm_pk_buff * spb1;
+ struct ssm_pk_buff * spb2;
+ struct ssm_pk_buff * spb3;
+ ssize_t ret1;
+ ssize_t ret2;
+ ssize_t ret3;
+ pid_t my_pid;
+ pid_t fake_pid = 99999;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ my_pid = getpid();
+
+ /* Allocate some blocks */
+ ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1);
+ ret2 = ssm_pool_alloc(pool, POOL_512, &ptr2, &spb2);
+ ret3 = ssm_pool_alloc(pool, POOL_1K, &ptr3, &spb3);
+ if (ret1 < 0 || ret2 < 0 || ret3 < 0) {
+ printf("Allocs failed: %zd, %zd, %zd.\n", ret1, ret2, ret3);
+ goto fail_alloc;
+ }
+
+ /* Simulate blocks from another process by changing allocator_pid */
+ spb1->allocator_pid = fake_pid;
+ spb2->allocator_pid = fake_pid;
+ /* Keep spb3 with our pid */
+
+ /* Reclaim orphans from fake_pid */
+ ssm_pool_reclaim_orphans(pool, fake_pid);
+
+ /* Verify spb1 and spb2 have refcount 0 (reclaimed) */
+ if (spb1->refcount != 0) {
+ printf("spb1 refcount should be 0, got %u.\n", spb1->refcount);
+ goto fail_test;
+ }
+
+ if (spb2->refcount != 0) {
+ printf("spb2 refcount should be 0, got %u.\n", spb2->refcount);
+ goto fail_test;
+ }
+
+ /* Verify spb3 still has refcount 1 (not reclaimed) */
+ if (spb3->refcount != 1) {
+ printf("spb3 refcount should be 1, got %u.\n", spb3->refcount);
+ goto fail_test;
+ }
+
+ /* Clean up */
+ ssm_pool_remove(pool, ret3);
+
+ /* Try allocating again - should get blocks from reclaimed pool */
+ ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1);
+ if (ret1 < 0) {
+ printf("Alloc after reclaim failed: %zd.\n", ret1);
+ goto fail_test;
+ }
+
+ /* Verify new allocation has our pid */
+ if (spb1->allocator_pid != my_pid) {
+ printf("New block has wrong pid: %d vs %d.\n",
+ spb1->allocator_pid, my_pid);
+ goto fail_test;
+ }
+
+ ssm_pool_remove(pool, ret1);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_test:
+ ssm_pool_remove(pool, ret3);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int pool_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ssm_pool_purge();
+
+ ret |= test_ssm_pool_basic_allocation();
+ ret |= test_ssm_pool_multiple_allocations();
+ ret |= test_ssm_pool_no_fallback_for_large();
+ ret |= test_ssm_pool_blocking_vs_nonblocking();
+ ret |= test_ssm_pool_stress_test();
+ ret |= test_ssm_pool_open_initializes_ssm();
+ ret |= test_ssm_pool_bounds_checking();
+ ret |= test_ssm_pool_inter_process_communication();
+ ret |= test_ssm_pool_read_operation();
+ ret |= test_ssm_pool_mlock_operation();
+ ret |= test_ssm_pk_buff_operations();
+ ret |= test_ssm_pool_size_class_boundaries();
+ ret |= test_ssm_pool_exhaustion();
+ ret |= test_ssm_pool_reclaim_orphans();
+
+ return ret;
+}
diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c
new file mode 100644
index 00000000..6e1cb5ec
--- /dev/null
+++ b/src/lib/ssm/tests/rbuff_test.c
@@ -0,0 +1,675 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Test of the SSM notification ring buffer
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+#include "ssm.h"
+
+#include <test/test.h>
+#include <ouroboros/ssm_rbuff.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/time.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <pthread.h>
+
+static int test_ssm_rbuff_create_destroy(void)
+{
+ struct ssm_rbuff * rb;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 1);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_write_read(void)
+{
+ struct ssm_rbuff * rb;
+ ssize_t idx;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 2);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ if (ssm_rbuff_write(rb, 42) < 0) {
+ printf("Failed to write value.\n");
+ goto fail_rb;
+ }
+
+ if (ssm_rbuff_queued(rb) != 1) {
+ printf("Queue length should be 1, got %zu.\n",
+ ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+
+ idx = ssm_rbuff_read(rb);
+ if (idx != 42) {
+ printf("Expected 42, got %zd.\n", idx);
+ goto fail_rb;
+ }
+
+ if (ssm_rbuff_queued(rb) != 0) {
+ printf("Queue should be empty, got %zu.\n",
+ ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_read_empty(void)
+{
+ struct ssm_rbuff * rb;
+ ssize_t ret;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 3);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ ret = ssm_rbuff_read(rb);
+ if (ret != -EAGAIN) {
+ printf("Expected -EAGAIN, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_fill_drain(void)
+{
+ struct ssm_rbuff * rb;
+ size_t i;
+ ssize_t ret;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 4);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
+ if (ssm_rbuff_queued(rb) != i) {
+ printf("Expected %zu queued, got %zu.\n",
+ i, ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+ if (ssm_rbuff_write(rb, i) < 0) {
+ printf("Failed to write at index %zu.\n", i);
+ goto fail_rb;
+ }
+ }
+
+ if (ssm_rbuff_queued(rb) != SSM_RBUFF_SIZE - 1) {
+ printf("Expected %d queued, got %zu.\n",
+ SSM_RBUFF_SIZE - 1, ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+
+ ret = ssm_rbuff_write(rb, 999);
+ if (ret != -EAGAIN) {
+ printf("Expected -EAGAIN on full buffer, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
+ ret = ssm_rbuff_read(rb);
+ if (ret != (ssize_t) i) {
+ printf("Expected %zu, got %zd.\n", i, ret);
+ goto fail_rb;
+ }
+ }
+
+ if (ssm_rbuff_queued(rb) != 0) {
+ printf("Expected empty queue, got %zu.\n",
+ ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_acl(void)
+{
+ struct ssm_rbuff * rb;
+ uint32_t acl;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 5);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ acl = ssm_rbuff_get_acl(rb);
+ if (acl != ACL_RDWR) {
+ printf("Expected ACL_RDWR, got %u.\n", acl);
+ goto fail_rb;
+ }
+
+ ssm_rbuff_set_acl(rb, ACL_RDONLY);
+ acl = ssm_rbuff_get_acl(rb);
+ if (acl != ACL_RDONLY) {
+ printf("Expected ACL_RDONLY, got %u.\n", acl);
+ goto fail_rb;
+ }
+
+ if (ssm_rbuff_write(rb, 1) != -ENOTALLOC) {
+ printf("Expected -ENOTALLOC on RDONLY.\n");
+ goto fail_rb;
+ }
+
+ ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) {
+ printf("Expected -EFLOWDOWN on FLOWDOWN.\n");
+ goto fail_rb;
+ }
+
+ if (ssm_rbuff_read(rb) != -EFLOWDOWN) {
+ printf("Expected -EFLOWDOWN on read with FLOWDOWN.\n");
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_open_close(void)
+{
+ struct ssm_rbuff * rb1;
+ struct ssm_rbuff * rb2;
+ pid_t pid;
+
+ TEST_START();
+
+ pid = getpid();
+
+ rb1 = ssm_rbuff_create(pid, 6);
+ if (rb1 == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ if (ssm_rbuff_write(rb1, 123) < 0) {
+ printf("Failed to write value.\n");
+ goto fail_rb1;
+ }
+
+ rb2 = ssm_rbuff_open(pid, 6);
+ if (rb2 == NULL) {
+ printf("Failed to open existing rbuff.\n");
+ goto fail_rb1;
+ }
+
+ if (ssm_rbuff_queued(rb2) != 1) {
+ printf("Expected 1 queued in opened rbuff, got %zu.\n",
+ ssm_rbuff_queued(rb2));
+ goto fail_rb2;
+ }
+
+ if (ssm_rbuff_read(rb2) != 123) {
+ printf("Failed to read from opened rbuff.\n");
+ goto fail_rb2;
+ }
+
+ ssm_rbuff_close(rb2);
+ ssm_rbuff_destroy(rb1);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb2:
+ ssm_rbuff_close(rb2);
+ fail_rb1:
+ ssm_rbuff_destroy(rb1);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+struct thread_args {
+ struct ssm_rbuff * rb;
+ int iterations;
+ int delay_us;
+};
+
+static void * writer_thread(void * arg)
+{
+ struct thread_args * args = (struct thread_args *) arg;
+ struct timespec delay = {0, 0};
+ int i;
+
+ delay.tv_nsec = args->delay_us * 1000L;
+
+ for (i = 0; i < args->iterations; ++i) {
+ while (ssm_rbuff_write(args->rb, i) < 0)
+ nanosleep(&delay, NULL);
+ }
+
+ return NULL;
+}
+
+static void * reader_thread(void * arg)
+{
+ struct thread_args * args = (struct thread_args *) arg;
+ struct timespec delay = {0, 0};
+ int i;
+ ssize_t val;
+
+ delay.tv_nsec = args->delay_us * 1000L;
+
+ for (i = 0; i < args->iterations; ++i) {
+ val = ssm_rbuff_read(args->rb);
+ while (val < 0) {
+ nanosleep(&delay, NULL);
+ val = ssm_rbuff_read(args->rb);
+ }
+ if (val != i) {
+ printf("Expected %d, got %zd.\n", i, val);
+ return (void *) -1;
+ }
+ }
+
+ return NULL;
+}
+
+static void * blocking_writer_thread(void * arg)
+{
+ struct thread_args * args = (struct thread_args *) arg;
+ int i;
+
+ for (i = 0; i < args->iterations; ++i) {
+ if (ssm_rbuff_write_b(args->rb, i, NULL) < 0)
+ return (void *) -1;
+ }
+
+ return NULL;
+}
+
+static void * blocking_reader_thread(void * arg)
+{
+ struct thread_args * args = (struct thread_args *) arg;
+ int i;
+ ssize_t val;
+
+ for (i = 0; i < args->iterations; ++i) {
+ val = ssm_rbuff_read_b(args->rb, NULL);
+ if (val < 0 || val != i) {
+ printf("Expected %d, got %zd.\n", i, val);
+ return (void *) -1;
+ }
+ }
+
+ return NULL;
+}
+
+static int test_ssm_rbuff_blocking(void)
+{
+ struct ssm_rbuff * rb;
+ pthread_t wthread;
+ pthread_t rthread;
+ struct thread_args args;
+ struct timespec delay = {0, 10 * MILLION};
+ void * ret_w;
+ void * ret_r;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 8);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ args.rb = rb;
+ args.iterations = 50;
+ args.delay_us = 0;
+
+ if (pthread_create(&rthread, NULL, blocking_reader_thread, &args)) {
+ printf("Failed to create reader thread.\n");
+ goto fail_rthread;
+ }
+
+ nanosleep(&delay, NULL);
+
+ if (pthread_create(&wthread, NULL, blocking_writer_thread, &args)) {
+ printf("Failed to create writer thread.\n");
+ pthread_cancel(rthread);
+ goto fail_wthread;
+ }
+
+ pthread_join(wthread, &ret_w);
+ pthread_join(rthread, &ret_r);
+
+ if (ret_w != NULL || ret_r != NULL) {
+ printf("Thread returned error.\n");
+ goto fail_ret;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_ret:
+ fail_wthread:
+ pthread_join(rthread, NULL);
+ fail_rthread:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_blocking_timeout(void)
+{
+ struct ssm_rbuff * rb;
+ struct timespec abs_timeout;
+ struct timespec interval = {0, 100 * MILLION};
+ struct timespec start;
+ struct timespec end;
+ ssize_t ret;
+ long elapsed_ms;
+ size_t i;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 9);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &start);
+ ts_add(&start, &interval, &abs_timeout);
+
+ ret = ssm_rbuff_read_b(rb, &abs_timeout);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &end);
+
+ if (ret != -ETIMEDOUT) {
+ printf("Expected -ETIMEDOUT, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ elapsed_ms = (end.tv_sec - start.tv_sec) * 1000L +
+ (end.tv_nsec - start.tv_nsec) / 1000000L;
+
+ if (elapsed_ms < 90 || elapsed_ms > 200) {
+ printf("Timeout took %ld ms, expected ~100 ms.\n",
+ elapsed_ms);
+ goto fail_rb;
+ }
+
+ for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
+ if (ssm_rbuff_write(rb, i) < 0) {
+ printf("Failed to fill buffer.\n");
+ goto fail_rb;
+ }
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &start);
+ ts_add(&start, &interval, &abs_timeout);
+
+ ret = ssm_rbuff_write_b(rb, 999, &abs_timeout);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &end);
+
+ if (ret != -ETIMEDOUT) {
+ printf("Expected -ETIMEDOUT on full buffer, got %zd.\n",
+ ret);
+ goto fail_rb;
+ }
+
+ elapsed_ms = (end.tv_sec - start.tv_sec) * 1000L +
+ (end.tv_nsec - start.tv_nsec) / 1000000L;
+
+ if (elapsed_ms < 90 || elapsed_ms > 200) {
+ printf("Write timeout took %ld ms, expected ~100 ms.\n",
+ elapsed_ms);
+ goto fail_rb;
+ }
+
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_blocking_flowdown(void)
+{
+ struct ssm_rbuff * rb;
+ struct timespec abs_timeout;
+ struct timespec now;
+ struct timespec interval = {5, 0};
+ ssize_t ret;
+ size_t i;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 10);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, &interval, &abs_timeout);
+
+ ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+
+ ret = ssm_rbuff_read_b(rb, &abs_timeout);
+ if (ret != -EFLOWDOWN) {
+ printf("Expected -EFLOWDOWN, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ ssm_rbuff_set_acl(rb, ACL_RDWR);
+
+ for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
+ if (ssm_rbuff_write(rb, i) < 0) {
+ printf("Failed to fill buffer.\n");
+ goto fail_rb;
+ }
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, &interval, &abs_timeout);
+
+ ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+
+ ret = ssm_rbuff_write_b(rb, 999, &abs_timeout);
+ if (ret != -EFLOWDOWN) {
+ printf("Expected -EFLOWDOWN on write, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ ssm_rbuff_set_acl(rb, ACL_RDWR);
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_threaded(void)
+{
+ struct ssm_rbuff * rb;
+ pthread_t wthread;
+ pthread_t rthread;
+ struct thread_args args;
+ void * ret_w;
+ void * ret_r;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 7);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ args.rb = rb;
+ args.iterations = 100;
+ args.delay_us = 100;
+
+ if (pthread_create(&wthread, NULL, writer_thread, &args)) {
+ printf("Failed to create writer thread.\n");
+ goto fail_rb;
+ }
+
+ if (pthread_create(&rthread, NULL, reader_thread, &args)) {
+ printf("Failed to create reader thread.\n");
+ pthread_cancel(wthread);
+ pthread_join(wthread, NULL);
+ goto fail_rb;
+ }
+
+ pthread_join(wthread, &ret_w);
+ pthread_join(rthread, &ret_r);
+
+ if (ret_w != NULL || ret_r != NULL) {
+ printf("Thread returned error.\n");
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int rbuff_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ret |= test_ssm_rbuff_create_destroy();
+ ret |= test_ssm_rbuff_write_read();
+ ret |= test_ssm_rbuff_read_empty();
+ ret |= test_ssm_rbuff_fill_drain();
+ ret |= test_ssm_rbuff_acl();
+ ret |= test_ssm_rbuff_open_close();
+ ret |= test_ssm_rbuff_threaded();
+ ret |= test_ssm_rbuff_blocking();
+ ret |= test_ssm_rbuff_blocking_timeout();
+ ret |= test_ssm_rbuff_blocking_flowdown();
+
+ return ret;
+}