summaryrefslogtreecommitdiff
path: root/src/lib/ssm
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/ssm')
-rw-r--r--src/lib/ssm/flow_set.c372
-rw-r--r--src/lib/ssm/pool.c882
-rw-r--r--src/lib/ssm/rbuff.c449
-rw-r--r--src/lib/ssm/ssm.h.in146
-rw-r--r--src/lib/ssm/tests/CMakeLists.txt33
-rw-r--r--src/lib/ssm/tests/flow_set_test.c255
-rw-r--r--src/lib/ssm/tests/pool_sharding_test.c505
-rw-r--r--src/lib/ssm/tests/pool_test.c1038
-rw-r--r--src/lib/ssm/tests/rbuff_test.c675
9 files changed, 4355 insertions, 0 deletions
diff --git a/src/lib/ssm/flow_set.c b/src/lib/ssm/flow_set.c
new file mode 100644
index 00000000..ab24d357
--- /dev/null
+++ b/src/lib/ssm/flow_set.c
@@ -0,0 +1,372 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Management of flow_sets for fqueue
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+#include "ssm.h"
+
+#include <ouroboros/errno.h>
+#include <ouroboros/lockfile.h>
+#include <ouroboros/pthread.h>
+#include <ouroboros/ssm_flow_set.h>
+#include <ouroboros/time.h>
+
+#include <assert.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+/*
+ * pthread_cond_timedwait has a WONTFIX bug as of glibc 2.25 where it
+ * doesn't test pthread cancellation when passed an expired timeout
+ * with the clock set to CLOCK_MONOTONIC.
+ */
+#if ((defined(__linux__) || (defined(__MACH__) && !defined(__APPLE__))) \
+ && (defined(__GLIBC__) && ((__GLIBC__ * 1000 + __GLIBC_MINOR__) >= 2025)) \
+ && (PTHREAD_COND_CLOCK == CLOCK_MONOTONIC))
+#define HAVE_CANCEL_BUG
+#endif
+
+#define FN_MAX_CHARS 255
+#define FS_PROT (PROT_READ | PROT_WRITE)
+
+#define QUEUESIZE ((SSM_RBUFF_SIZE) * sizeof(struct flowevent))
+
+#define SSM_FSET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \
+ + PROG_MAX_FQUEUES * sizeof(size_t) \
+ + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \
+ + PROG_MAX_FQUEUES * QUEUESIZE \
+ + sizeof(pthread_mutex_t))
+
+#define fqueue_ptr(fs, idx) (fs->fqueues + (SSM_RBUFF_SIZE) * idx)
+
+struct ssm_flow_set {
+ ssize_t * mtable;
+ size_t * heads;
+ pthread_cond_t * conds;
+ struct flowevent * fqueues;
+ pthread_mutex_t * lock;
+
+ pid_t pid;
+};
+
+static struct ssm_flow_set * flow_set_create(pid_t pid,
+ int oflags)
+{
+ struct ssm_flow_set * set;
+ ssize_t * shm_base;
+ char fn[FN_MAX_CHARS];
+ int fd;
+
+ sprintf(fn, SSM_FLOW_SET_PREFIX "%d", pid);
+
+ set = malloc(sizeof(*set));
+ if (set == NULL)
+ goto fail_malloc;
+
+ fd = shm_open(fn, oflags, 0666);
+ if (fd == -1)
+ goto fail_shm_open;
+
+ if ((oflags & O_CREAT) && ftruncate(fd, SSM_FSET_FILE_SIZE) < 0)
+ goto fail_truncate;
+
+ shm_base = mmap(NULL, SSM_FSET_FILE_SIZE, FS_PROT, MAP_SHARED, fd, 0);
+ if (shm_base == MAP_FAILED)
+ goto fail_mmap;
+
+ close(fd);
+
+ set->mtable = shm_base;
+ set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS);
+ set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
+ set->fqueues = (struct flowevent *) (set->conds + PROG_MAX_FQUEUES);
+ set->lock = (pthread_mutex_t *)
+ (set->fqueues + PROG_MAX_FQUEUES * (SSM_RBUFF_SIZE));
+
+ return set;
+
+ fail_mmap:
+ if (oflags & O_CREAT)
+ shm_unlink(fn);
+ fail_truncate:
+ close(fd);
+ fail_shm_open:
+ free(set);
+ fail_malloc:
+ return NULL;
+}
+
+struct ssm_flow_set * ssm_flow_set_create(pid_t pid)
+{
+ struct ssm_flow_set * set;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ mode_t mask;
+ int i;
+
+ mask = umask(0);
+
+ set = flow_set_create(pid, O_CREAT | O_RDWR);
+
+ umask(mask);
+
+ if (set == NULL)
+ goto fail_set;
+
+ set->pid = pid;
+
+ if (pthread_mutexattr_init(&mattr))
+ goto fail_mutexattr_init;
+
+#ifdef HAVE_ROBUST_MUTEX
+ if (pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST))
+ goto fail_mattr_set;
+#endif
+ if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED))
+ goto fail_mattr_set;
+
+ if (pthread_mutex_init(set->lock, &mattr))
+ goto fail_mattr_set;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_condattr_init;
+
+ if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED))
+ goto fail_condattr_set;
+
+#ifndef __APPLE__
+ if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK))
+ goto fail_condattr_set;
+#endif
+ for (i = 0; i < PROG_MAX_FQUEUES; ++i) {
+ set->heads[i] = 0;
+ if (pthread_cond_init(&set->conds[i], &cattr))
+ goto fail_init;
+ }
+
+ for (i = 0; i < SYS_MAX_FLOWS; ++i)
+ set->mtable[i] = -1;
+
+ return set;
+
+ fail_init:
+ while (i-- > 0)
+ pthread_cond_destroy(&set->conds[i]);
+ fail_condattr_set:
+ pthread_condattr_destroy(&cattr);
+ fail_condattr_init:
+ pthread_mutex_destroy(set->lock);
+ fail_mattr_set:
+ pthread_mutexattr_destroy(&mattr);
+ fail_mutexattr_init:
+ ssm_flow_set_destroy(set);
+ fail_set:
+ return NULL;
+}
+
+struct ssm_flow_set * ssm_flow_set_open(pid_t pid)
+{
+ return flow_set_create(pid, O_RDWR);
+}
+
+void ssm_flow_set_destroy(struct ssm_flow_set * set)
+{
+ char fn[FN_MAX_CHARS];
+
+ assert(set);
+
+ sprintf(fn, SSM_FLOW_SET_PREFIX "%d", set->pid);
+
+ ssm_flow_set_close(set);
+
+ shm_unlink(fn);
+}
+
+void ssm_flow_set_close(struct ssm_flow_set * set)
+{
+ assert(set);
+
+ munmap(set->mtable, SSM_FSET_FILE_SIZE);
+ free(set);
+}
+
+void ssm_flow_set_zero(struct ssm_flow_set * set,
+ size_t idx)
+{
+ ssize_t i = 0;
+
+ assert(set);
+ assert(idx < PROG_MAX_FQUEUES);
+
+ pthread_mutex_lock(set->lock);
+
+ for (i = 0; i < SYS_MAX_FLOWS; ++i)
+ if (set->mtable[i] == (ssize_t) idx)
+ set->mtable[i] = -1;
+
+ set->heads[idx] = 0;
+
+ pthread_mutex_unlock(set->lock);
+}
+
+
+int ssm_flow_set_add(struct ssm_flow_set * set,
+ size_t idx,
+ int flow_id)
+{
+ assert(set);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
+ assert(idx < PROG_MAX_FQUEUES);
+
+ pthread_mutex_lock(set->lock);
+
+ if (set->mtable[flow_id] != -1) {
+ pthread_mutex_unlock(set->lock);
+ return -EPERM;
+ }
+
+ set->mtable[flow_id] = idx;
+
+ pthread_mutex_unlock(set->lock);
+
+ return 0;
+}
+
+void ssm_flow_set_del(struct ssm_flow_set * set,
+ size_t idx,
+ int flow_id)
+{
+ assert(set);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
+ assert(idx < PROG_MAX_FQUEUES);
+
+ pthread_mutex_lock(set->lock);
+
+ if (set->mtable[flow_id] == (ssize_t) idx)
+ set->mtable[flow_id] = -1;
+
+ pthread_mutex_unlock(set->lock);
+}
+
+int ssm_flow_set_has(struct ssm_flow_set * set,
+ size_t idx,
+ int flow_id)
+{
+ int ret = 0;
+
+ assert(set);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
+ assert(idx < PROG_MAX_FQUEUES);
+
+ pthread_mutex_lock(set->lock);
+
+ if (set->mtable[flow_id] == (ssize_t) idx)
+ ret = 1;
+
+ pthread_mutex_unlock(set->lock);
+
+ return ret;
+}
+
+void ssm_flow_set_notify(struct ssm_flow_set * set,
+ int flow_id,
+ int event)
+{
+ struct flowevent * e;
+
+ assert(set);
+ assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS);
+
+ pthread_mutex_lock(set->lock);
+
+ if (set->mtable[flow_id] == -1) {
+ pthread_mutex_unlock(set->lock);
+ return;
+ }
+
+ e = fqueue_ptr(set, set->mtable[flow_id]) +
+ set->heads[set->mtable[flow_id]];
+
+ e->flow_id = flow_id;
+ e->event = event;
+
+ ++set->heads[set->mtable[flow_id]];
+
+ pthread_cond_signal(&set->conds[set->mtable[flow_id]]);
+
+ pthread_mutex_unlock(set->lock);
+}
+
+
+ssize_t ssm_flow_set_wait(const struct ssm_flow_set * set,
+ size_t idx,
+ struct flowevent * fqueue,
+ const struct timespec * abstime)
+{
+ ssize_t ret = 0;
+
+ assert(set);
+ assert(idx < PROG_MAX_FQUEUES);
+ assert(fqueue);
+
+#ifndef HAVE_ROBUST_MUTEX
+ pthread_mutex_lock(set->lock);
+#else
+ if (pthread_mutex_lock(set->lock) == EOWNERDEAD)
+ pthread_mutex_consistent(set->lock);
+#endif
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, set->lock);
+
+ while (set->heads[idx] == 0 && ret != -ETIMEDOUT) {
+ ret = -__timedwait(set->conds + idx, set->lock, abstime);
+#ifdef HAVE_CANCEL_BUG
+ if (ret == -ETIMEDOUT)
+ pthread_testcancel();
+#endif
+#ifdef HAVE_ROBUST_MUTEX
+ if (ret == -EOWNERDEAD)
+ pthread_mutex_consistent(set->lock);
+#endif
+ }
+
+ if (ret != -ETIMEDOUT) {
+ memcpy(fqueue,
+ fqueue_ptr(set, idx),
+ set->heads[idx] * sizeof(*fqueue));
+ ret = set->heads[idx];
+ set->heads[idx] = 0;
+ }
+
+ pthread_cleanup_pop(true);
+
+ assert(ret);
+
+ return ret;
+}
diff --git a/src/lib/ssm/pool.c b/src/lib/ssm/pool.c
new file mode 100644
index 00000000..b8cfe3a1
--- /dev/null
+++ b/src/lib/ssm/pool.c
@@ -0,0 +1,882 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Secure Shared Memory Infrastructure (SSMI) Packet Buffer
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+
+#include <ouroboros/errno.h>
+#include <ouroboros/pthread.h>
+#include <ouroboros/ssm_pool.h>
+
+#include "ssm.h"
+
+#include <assert.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+/* Size class configuration from CMake */
+static const struct ssm_size_class_cfg ssm_sc_cfg[SSM_POOL_MAX_CLASSES] = {
+ { (1 << 8), SSM_POOL_256_BLOCKS },
+ { (1 << 9), SSM_POOL_512_BLOCKS },
+ { (1 << 10), SSM_POOL_1K_BLOCKS },
+ { (1 << 11), SSM_POOL_2K_BLOCKS },
+ { (1 << 12), SSM_POOL_4K_BLOCKS },
+ { (1 << 14), SSM_POOL_16K_BLOCKS },
+ { (1 << 16), SSM_POOL_64K_BLOCKS },
+ { (1 << 18), SSM_POOL_256K_BLOCKS },
+ { (1 << 20), SSM_POOL_1M_BLOCKS },
+};
+
+#define PTR_TO_OFFSET(pool_base, ptr) \
+ ((uintptr_t)(ptr) - (uintptr_t)(pool_base))
+
+#define OFFSET_TO_PTR(pool_base, offset) \
+ ((offset == 0) ? NULL : (void *)((uintptr_t)(pool_base) + offset))
+
+#define GET_SHARD_FOR_PID(pid) ((int)((pid) % SSM_POOL_SHARDS))
+
+#define LOAD_RELAXED(ptr) \
+ (__atomic_load_n(ptr, __ATOMIC_RELAXED))
+
+#define LOAD_ACQUIRE(ptr) \
+ (__atomic_load_n(ptr, __ATOMIC_ACQUIRE))
+
+#define STORE_RELEASE(ptr, val) \
+ (__atomic_store_n(ptr, val, __ATOMIC_RELEASE))
+
+#define LOAD(ptr) \
+ (__atomic_load_n(ptr, __ATOMIC_SEQ_CST))
+
+#define STORE(ptr, val) \
+ (__atomic_store_n(ptr, val, __ATOMIC_SEQ_CST))
+
+#define FETCH_ADD(ptr, val) \
+ (__atomic_fetch_add(ptr, val, __ATOMIC_SEQ_CST))
+
+#define FETCH_SUB(ptr, val) \
+ (__atomic_fetch_sub(ptr, val, __ATOMIC_SEQ_CST))
+
+#define CAS(ptr, expected, desired) \
+ (__atomic_compare_exchange_n(ptr, expected, desired, false, \
+ __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))
+
+#define SSM_FILE_SIZE (SSM_POOL_TOTAL_SIZE + sizeof(struct _ssm_pool_hdr))
+
+struct ssm_pool {
+ uint8_t * shm_base; /* start of blocks */
+ struct _ssm_pool_hdr * hdr; /* shared memory header */
+ void * pool_base; /* base of the memory pool */
+};
+
+static __inline__
+struct ssm_pk_buff * list_remove_head(struct _ssm_list_head * head,
+ void * base)
+{
+ uint32_t off;
+ uint32_t next_off;
+ struct ssm_pk_buff * blk;
+
+ assert(head != NULL);
+ assert(base != NULL);
+
+ off = LOAD(&head->head_offset);
+ if (off == 0)
+ return NULL;
+
+ /* Validate offset is within pool bounds */
+ if (off >= SSM_POOL_TOTAL_SIZE)
+ return NULL;
+
+ blk = OFFSET_TO_PTR(base, off);
+ next_off = LOAD(&blk->next_offset);
+
+
+
+ STORE(&head->head_offset, next_off);
+ STORE(&head->count, LOAD(&head->count) - 1);
+
+ return blk;
+}
+static __inline__ void list_add_head(struct _ssm_list_head * head,
+ struct ssm_pk_buff * blk,
+ void * base)
+{
+ uint32_t off;
+ uint32_t old;
+
+ assert(head != NULL);
+ assert(blk != NULL);
+ assert(base != NULL);
+
+ off = (uint32_t) PTR_TO_OFFSET(base, blk);
+ old = LOAD(&head->head_offset);
+
+ STORE(&blk->next_offset, old);
+ STORE(&head->head_offset, off);
+ STORE(&head->count, LOAD(&head->count) + 1);
+}
+
+static __inline__ int select_size_class(size_t len)
+{
+ size_t sz;
+ int i;
+
+ /* Total space needed: header + headspace + data + tailspace */
+ sz = sizeof(struct ssm_pk_buff) + SSM_PK_BUFF_HEADSPACE + len
+ + SSM_PK_BUFF_TAILSPACE;
+
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ if (ssm_sc_cfg[i].blocks > 0 && sz <= ssm_sc_cfg[i].size)
+ return i;
+ }
+
+ return -1;
+}
+
+static __inline__ int find_size_class_for_offset(struct ssm_pool * pool,
+ size_t offset)
+{
+ int c;
+
+ assert(pool != NULL);
+
+ for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) {
+ struct _ssm_size_class * sc = &pool->hdr->size_classes[c];
+
+ if (sc->object_size == 0)
+ continue;
+
+ if (offset >= sc->pool_start &&
+ offset < sc->pool_start + sc->pool_size)
+ return c;
+ }
+
+ return -1;
+}
+
+static void init_size_classes(struct ssm_pool * pool)
+{
+ struct _ssm_size_class * sc;
+ struct _ssm_shard * shard;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ uint8_t * region;
+ size_t offset;
+ int c;
+ int s;
+ size_t i;
+
+ assert(pool != NULL);
+
+ /* Check if already initialized */
+ if (LOAD(&pool->hdr->initialized) != 0)
+ return;
+
+ pthread_mutexattr_init(&mattr);
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+#ifdef HAVE_ROBUST_MUTEX
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+#endif
+ pthread_mutexattr_setprotocol(&mattr, PTHREAD_PRIO_INHERIT);
+
+ pthread_condattr_init(&cattr);
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ offset = 0;
+
+ for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) {
+ if (ssm_sc_cfg[c].blocks == 0)
+ continue;
+
+ sc = &pool->hdr->size_classes[c];
+
+ sc->object_size = ssm_sc_cfg[c].size;
+ sc->pool_start = offset;
+ sc->pool_size = ssm_sc_cfg[c].size * ssm_sc_cfg[c].blocks;
+ sc->object_count = ssm_sc_cfg[c].blocks;
+
+ /* Initialize all shards */
+ for (s = 0; s < SSM_POOL_SHARDS; s++) {
+ shard = &sc->shards[s];
+
+ STORE(&shard->free_list.head_offset, 0);
+ STORE(&shard->free_list.count, 0);
+ STORE(&shard->free_count, 0);
+
+ pthread_mutex_init(&shard->mtx, &mattr);
+ pthread_cond_init(&shard->cond, &cattr);
+ }
+
+ /* Lazy distribution: put all blocks in shard 0 initially */
+ region = pool->shm_base + offset;
+
+ for (i = 0; i < sc->object_count; ++i) {
+ struct ssm_pk_buff * blk;
+
+ blk = (struct ssm_pk_buff *)
+ (region + i * sc->object_size);
+
+ STORE(&blk->refcount, 0);
+ blk->allocator_pid = 0;
+ STORE(&blk->next_offset, 0);
+
+ list_add_head(&sc->shards[0].free_list, blk,
+ pool->pool_base);
+ FETCH_ADD(&sc->shards[0].free_count, 1);
+ }
+
+ offset += sc->pool_size;
+ }
+
+ /* Mark as initialized - acts as memory barrier */
+ STORE(&pool->hdr->initialized, 1);
+
+ pthread_mutexattr_destroy(&mattr);
+ pthread_condattr_destroy(&cattr);
+}
+
+/*
+ * Reclaim all blocks allocated by a specific pid in a size class.
+ * Called with shard mutex held.
+ */
+static size_t reclaim_pid_from_sc(struct _ssm_size_class * sc,
+ struct _ssm_shard * shard,
+ void * pool_base,
+ pid_t pid)
+{
+ uint8_t * region;
+ size_t i;
+ size_t recovered = 0;
+ struct ssm_pk_buff * blk;
+
+ region = (uint8_t *) pool_base + sc->pool_start;
+
+ for (i = 0; i < sc->object_count; ++i) {
+ blk = (struct ssm_pk_buff *)(region + i * sc->object_size);
+
+ if (blk->allocator_pid == pid && LOAD(&blk->refcount) > 0) {
+ STORE(&blk->refcount, 0);
+ blk->allocator_pid = 0;
+ list_add_head(&shard->free_list, blk, pool_base);
+ FETCH_ADD(&shard->free_count, 1);
+ recovered++;
+ }
+ }
+
+ return recovered;
+}
+
+void ssm_pool_reclaim_orphans(struct ssm_pool * pool,
+ pid_t pid)
+{
+ size_t sc_idx;
+
+ if (pool == NULL || pid <= 0)
+ return;
+
+ for (sc_idx = 0; sc_idx < SSM_POOL_MAX_CLASSES; sc_idx++) {
+ struct _ssm_size_class * sc;
+ struct _ssm_shard * shard;
+
+ sc = &pool->hdr->size_classes[sc_idx];
+ if (sc->object_count == 0)
+ continue;
+
+ /* Reclaim to shard 0 for simplicity */
+ shard = &sc->shards[0];
+ robust_mutex_lock(&shard->mtx);
+ reclaim_pid_from_sc(sc, shard, pool->pool_base, pid);
+ pthread_mutex_unlock(&shard->mtx);
+ }
+}
+
+static __inline__
+struct ssm_pk_buff * try_alloc_from_shard(struct _ssm_shard * shard,
+ void * base)
+{
+ struct ssm_pk_buff * blk;
+
+ robust_mutex_lock(&shard->mtx);
+
+ if (LOAD(&shard->free_count) > 0) {
+ blk = list_remove_head(&shard->free_list, base);
+ if (blk != NULL) {
+ FETCH_SUB(&shard->free_count, 1);
+ return blk; /* Caller must unlock */
+ }
+ FETCH_SUB(&shard->free_count, 1);
+ }
+
+ pthread_mutex_unlock(&shard->mtx);
+ return NULL;
+}
+
+static __inline__ ssize_t init_block(struct ssm_pool * pool,
+ struct _ssm_size_class * sc,
+ struct _ssm_shard * shard,
+ struct ssm_pk_buff * blk,
+ size_t len,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb)
+{
+ STORE(&blk->refcount, 1);
+ blk->allocator_pid = getpid();
+ blk->size = (uint32_t) (sc->object_size -
+ sizeof(struct ssm_pk_buff));
+ blk->pk_head = SSM_PK_BUFF_HEADSPACE;
+ blk->pk_tail = blk->pk_head + (uint32_t) len;
+ blk->off = (uint32_t) PTR_TO_OFFSET(pool->pool_base, blk);
+
+ pthread_mutex_unlock(&shard->mtx);
+
+ *spb = blk;
+ if (ptr != NULL)
+ *ptr = blk->data + blk->pk_head;
+
+ return blk->off;
+}
+
+/* Non-blocking allocation from size class */
+static ssize_t alloc_from_sc(struct ssm_pool * pool,
+ int idx,
+ size_t len,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb)
+{
+ struct _ssm_size_class * sc;
+ struct ssm_pk_buff * blk;
+ int local;
+ int s;
+
+ assert(pool != NULL);
+ assert(idx >= 0 && idx < SSM_POOL_MAX_CLASSES);
+ assert(spb != NULL);
+
+ sc = &pool->hdr->size_classes[idx];
+ local = GET_SHARD_FOR_PID(getpid());
+
+ for (s = 0; s < SSM_POOL_SHARDS; s++) {
+ struct _ssm_shard * shard;
+ int idx;
+
+ idx = (local + s) % SSM_POOL_SHARDS;
+ shard = &sc->shards[idx];
+
+ blk = try_alloc_from_shard(shard, pool->pool_base);
+ if (blk != NULL)
+ return init_block(pool, sc, shard, blk, len, ptr, spb);
+ }
+
+ return -EAGAIN;
+}
+
+/* Blocking allocation from size class */
+static ssize_t alloc_from_sc_b(struct ssm_pool * pool,
+ int idx,
+ size_t len,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb,
+ const struct timespec * abstime)
+{
+ struct _ssm_size_class * sc;
+ struct _ssm_shard * shard;
+ struct ssm_pk_buff * blk = NULL;
+ int local;
+ int s;
+ int ret = 0;
+
+ assert(pool != NULL);
+ assert(idx >= 0 && idx < SSM_POOL_MAX_CLASSES);
+ assert(spb != NULL);
+
+ sc = &pool->hdr->size_classes[idx];
+ local = GET_SHARD_FOR_PID(getpid());
+
+ while (blk == NULL && ret != ETIMEDOUT) {
+ /* Try non-blocking allocation from any shard */
+ for (s = 0; s < SSM_POOL_SHARDS && blk == NULL; s++) {
+ shard = &sc->shards[(local + s) % SSM_POOL_SHARDS];
+ blk = try_alloc_from_shard(shard, pool->pool_base);
+ }
+
+ if (blk != NULL)
+ break;
+
+ /* Nothing available, wait for signal */
+ shard = &sc->shards[local];
+ robust_mutex_lock(&shard->mtx);
+ ret = robust_wait(&shard->cond, &shard->mtx, abstime);
+ pthread_mutex_unlock(&shard->mtx);
+ }
+
+ if (ret == ETIMEDOUT)
+ return -ETIMEDOUT;
+
+ return init_block(pool, sc, shard, blk, len, ptr, spb);
+}
+
+/* Global Shared Packet Pool */
+static char * gspp_filename(void)
+{
+ char * str;
+ char * test_suffix;
+
+ test_suffix = getenv("OUROBOROS_TEST_POOL_SUFFIX");
+ if (test_suffix != NULL) {
+ str = malloc(strlen(SSM_POOL_NAME) + strlen(test_suffix) + 1);
+ if (str == NULL)
+ return NULL;
+ sprintf(str, "%s%s", SSM_POOL_NAME, test_suffix);
+ } else {
+ str = malloc(strlen(SSM_POOL_NAME) + 1);
+ if (str == NULL)
+ return NULL;
+ sprintf(str, "%s", SSM_POOL_NAME);
+ }
+
+ return str;
+}
+
+void ssm_pool_close(struct ssm_pool * pool)
+{
+ assert(pool != NULL);
+
+ munmap(pool->shm_base, SSM_FILE_SIZE);
+ free(pool);
+}
+
+void ssm_pool_destroy(struct ssm_pool * pool)
+{
+ char * fn;
+
+ assert(pool != NULL);
+
+ if (getpid() != pool->hdr->pid && kill(pool->hdr->pid, 0) == 0) {
+ free(pool);
+ return;
+ }
+
+ ssm_pool_close(pool);
+
+ fn = gspp_filename();
+ if (fn == NULL)
+ return;
+
+ shm_unlink(fn);
+ free(fn);
+}
+
+#define MM_FLAGS (PROT_READ | PROT_WRITE)
+
+static struct ssm_pool * pool_create(int flags)
+{
+ struct ssm_pool * pool;
+ int fd;
+ uint8_t * shm_base;
+ char * fn;
+
+ fn = gspp_filename();
+ if (fn == NULL)
+ goto fail_fn;
+
+ pool = malloc(sizeof *pool);
+ if (pool == NULL)
+ goto fail_rdrb;
+
+ fd = shm_open(fn, flags, 0666);
+ if (fd == -1)
+ goto fail_open;
+
+ if ((flags & O_CREAT) && ftruncate(fd, SSM_FILE_SIZE) < 0)
+ goto fail_truncate;
+
+ shm_base = mmap(NULL, SSM_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0);
+ if (shm_base == MAP_FAILED)
+ goto fail_truncate;
+
+ pool->shm_base = shm_base;
+ pool->pool_base = shm_base;
+ pool->hdr = (struct _ssm_pool_hdr *) (shm_base + SSM_POOL_TOTAL_SIZE);
+
+ if (flags & O_CREAT)
+ pool->hdr->mapped_addr = shm_base;
+
+ close(fd);
+
+ free(fn);
+
+ return pool;
+
+ fail_truncate:
+ close(fd);
+ if (flags & O_CREAT)
+ shm_unlink(fn);
+ fail_open:
+ free(pool);
+ fail_rdrb:
+ free(fn);
+ fail_fn:
+ return NULL;
+}
+
+struct ssm_pool * ssm_pool_create(void)
+{
+ struct ssm_pool * pool;
+ mode_t mask;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+
+ mask = umask(0);
+
+ pool = pool_create(O_CREAT | O_EXCL | O_RDWR);
+
+ umask(mask);
+
+ if (pool == NULL)
+ goto fail_rdrb;
+
+ if (pthread_mutexattr_init(&mattr))
+ goto fail_mattr;
+
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+#ifdef HAVE_ROBUST_MUTEX
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+#endif
+ if (pthread_mutex_init(&pool->hdr->mtx, &mattr))
+ goto fail_mutex;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(&pool->hdr->healthy, &cattr))
+ goto fail_healthy;
+
+ pool->hdr->pid = getpid();
+ /* Will be set by init_size_classes */
+ STORE(&pool->hdr->initialized, 0);
+
+ init_size_classes(pool);
+
+ pthread_mutexattr_destroy(&mattr);
+ pthread_condattr_destroy(&cattr);
+
+ return pool;
+
+ fail_healthy:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(&pool->hdr->mtx);
+ fail_mutex:
+ pthread_mutexattr_destroy(&mattr);
+ fail_mattr:
+ ssm_pool_destroy(pool);
+ fail_rdrb:
+ return NULL;
+}
+
+struct ssm_pool * ssm_pool_open(void)
+{
+ struct ssm_pool * pool;
+
+ pool = pool_create(O_RDWR);
+ if (pool != NULL)
+ init_size_classes(pool);
+
+ return pool;
+}
+
+void ssm_pool_purge(void)
+{
+ char * fn;
+
+ fn = gspp_filename();
+ if (fn == NULL)
+ return;
+
+ shm_unlink(fn);
+ free(fn);
+}
+
+int ssm_pool_mlock(struct ssm_pool * pool)
+{
+ assert(pool != NULL);
+
+ return mlock(pool->shm_base, SSM_FILE_SIZE);
+}
+
+ssize_t ssm_pool_alloc(struct ssm_pool * pool,
+ size_t count,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb)
+{
+ int idx;
+
+ assert(pool != NULL);
+ assert(spb != NULL);
+
+ idx = select_size_class(count);
+ if (idx >= 0)
+ return alloc_from_sc(pool, idx, count, ptr, spb);
+
+ return -EMSGSIZE;
+}
+
+ssize_t ssm_pool_alloc_b(struct ssm_pool * pool,
+ size_t count,
+ uint8_t ** ptr,
+ struct ssm_pk_buff ** spb,
+ const struct timespec * abstime)
+{
+ int idx;
+
+ assert(pool != NULL);
+ assert(spb != NULL);
+
+ idx = select_size_class(count);
+ if (idx >= 0)
+ return alloc_from_sc_b(pool, idx, count, ptr, spb, abstime);
+
+ return -EMSGSIZE;
+}
+
+ssize_t ssm_pool_read(uint8_t ** dst,
+ struct ssm_pool * pool,
+ size_t off)
+{
+ struct ssm_pk_buff * blk;
+
+ assert(dst != NULL);
+ assert(pool != NULL);
+
+ blk = OFFSET_TO_PTR(pool->pool_base, off);
+ if (blk == NULL)
+ return -EINVAL;
+
+ *dst = blk->data + blk->pk_head;
+
+ return (ssize_t) (blk->pk_tail - blk->pk_head);
+}
+
+struct ssm_pk_buff * ssm_pool_get(struct ssm_pool * pool,
+ size_t off)
+{
+ struct ssm_pk_buff * blk;
+
+ assert(pool != NULL);
+
+ if (off == 0 || off >= (size_t) SSM_POOL_TOTAL_SIZE)
+ return NULL;
+
+ blk = OFFSET_TO_PTR(pool->pool_base, off);
+ if (blk == NULL)
+ return NULL;
+
+ if (LOAD(&blk->refcount) == 0)
+ return NULL;
+
+ return blk;
+}
+
+int ssm_pool_remove(struct ssm_pool * pool,
+ size_t off)
+{
+ struct ssm_pk_buff * blk;
+ struct _ssm_size_class * sc;
+ struct _ssm_shard * shard;
+ int sc_idx;
+ int shard_idx;
+ uint16_t old_ref;
+
+ assert(pool != NULL);
+
+ if (off == 0 || off >= SSM_POOL_TOTAL_SIZE)
+ return -EINVAL;
+
+ blk = OFFSET_TO_PTR(pool->pool_base, off);
+ if (blk == NULL)
+ return -EINVAL;
+
+ sc_idx = find_size_class_for_offset(pool, off);
+ if (sc_idx < 0)
+ return -EINVAL;
+
+ sc = &pool->hdr->size_classes[sc_idx];
+
+ /* Free to allocator's shard (lazy distribution in action) */
+ shard_idx = GET_SHARD_FOR_PID(blk->allocator_pid);
+ shard = &sc->shards[shard_idx];
+
+ robust_mutex_lock(&shard->mtx);
+
+ old_ref = FETCH_SUB(&blk->refcount, 1);
+ if (old_ref > 1) {
+ /* Still referenced */
+ pthread_mutex_unlock(&shard->mtx);
+ return 0;
+ }
+
+ blk->allocator_pid = 0;
+#ifdef CONFIG_OUROBOROS_DEBUG
+ if (old_ref == 0) {
+ /* Underflow - double free attempt */
+ pthread_mutex_unlock(&shard->mtx);
+ abort();
+ }
+
+ /* Poison fields to detect use-after-free */
+ blk->pk_head = 0xDEAD;
+ blk->pk_tail = 0xBEEF;
+#endif
+ list_add_head(&shard->free_list, blk, pool->pool_base);
+ FETCH_ADD(&shard->free_count, 1);
+
+ pthread_cond_signal(&shard->cond);
+
+ pthread_mutex_unlock(&shard->mtx);
+
+ return 0;
+}
+
+size_t ssm_pk_buff_get_idx(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ return spb->off;
+}
+
+uint8_t * ssm_pk_buff_head(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ return spb->data + spb->pk_head;
+}
+
+uint8_t * ssm_pk_buff_tail(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ return spb->data + spb->pk_tail;
+}
+
+size_t ssm_pk_buff_len(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ return spb->pk_tail - spb->pk_head;
+}
+
+uint8_t * ssm_pk_buff_head_alloc(struct ssm_pk_buff * spb,
+ size_t size)
+{
+ assert(spb != NULL);
+
+ if (spb->pk_head < size)
+ return NULL;
+
+ spb->pk_head -= size;
+
+ return spb->data + spb->pk_head;
+}
+
+uint8_t * ssm_pk_buff_tail_alloc(struct ssm_pk_buff * spb,
+ size_t size)
+{
+ uint8_t * buf;
+
+ assert(spb != NULL);
+
+ if (spb->pk_tail + size >= spb->size)
+ return NULL;
+
+ buf = spb->data + spb->pk_tail;
+
+ spb->pk_tail += size;
+
+ return buf;
+}
+
+uint8_t * ssm_pk_buff_head_release(struct ssm_pk_buff * spb,
+ size_t size)
+{
+ uint8_t * buf;
+
+ assert(spb != NULL);
+ assert(!(size > spb->pk_tail - spb->pk_head));
+
+ buf = spb->data + spb->pk_head;
+
+ spb->pk_head += size;
+
+ return buf;
+}
+
+uint8_t * ssm_pk_buff_tail_release(struct ssm_pk_buff * spb,
+ size_t size)
+{
+ assert(spb != NULL);
+ assert(!(size > spb->pk_tail - spb->pk_head));
+
+ spb->pk_tail -= size;
+
+ return spb->data + spb->pk_tail;
+}
+
+void ssm_pk_buff_truncate(struct ssm_pk_buff * spb,
+ size_t len)
+{
+ assert(spb != NULL);
+ assert(len <= spb->size);
+
+ spb->pk_tail = spb->pk_head + len;
+}
+
+int ssm_pk_buff_wait_ack(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ FETCH_ADD(&spb->refcount, 1);
+
+ return 0;
+}
+
+int ssm_pk_buff_ack(struct ssm_pk_buff * spb)
+{
+ assert(spb != NULL);
+
+ FETCH_SUB(&spb->refcount, 1);
+
+ return 0;
+}
diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c
new file mode 100644
index 00000000..e18f8ba7
--- /dev/null
+++ b/src/lib/ssm/rbuff.c
@@ -0,0 +1,449 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Ring buffer implementations for incoming packets
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+#include "ssm.h"
+
+#include <ouroboros/ssm_rbuff.h>
+#include <ouroboros/lockfile.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
+#include <ouroboros/pthread.h>
+#include <ouroboros/time.h>
+
+#include <assert.h>
+#include <fcntl.h>
+#include <signal.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+
+#define FN_MAX_CHARS 255
+
+#define SSM_RBUFF_FILESIZE ((SSM_RBUFF_SIZE) * sizeof(ssize_t) \
+ + 3 * sizeof(size_t) \
+ + sizeof(pthread_mutex_t) \
+ + 2 * sizeof(pthread_cond_t))
+
+#define MODB(x) ((x) & (SSM_RBUFF_SIZE - 1))
+
+#define LOAD_RELAXED(ptr) (__atomic_load_n(ptr, __ATOMIC_RELAXED))
+#define LOAD_ACQUIRE(ptr) (__atomic_load_n(ptr, __ATOMIC_ACQUIRE))
+#define STORE_RELEASE(ptr, val) \
+ (__atomic_store_n(ptr, val, __ATOMIC_RELEASE))
+
+#define HEAD(rb) (rb->shm_base[LOAD_RELAXED(rb->head)])
+#define TAIL(rb) (rb->shm_base[LOAD_RELAXED(rb->tail)])
+#define HEAD_IDX(rb) (LOAD_ACQUIRE(rb->head))
+#define TAIL_IDX(rb) (LOAD_ACQUIRE(rb->tail))
+#define ADVANCE_HEAD(rb) \
+ (STORE_RELEASE(rb->head, MODB(LOAD_RELAXED(rb->head) + 1)))
+#define ADVANCE_TAIL(rb) \
+ (STORE_RELEASE(rb->tail, MODB(LOAD_RELAXED(rb->tail) + 1)))
+#define QUEUED(rb) (MODB(HEAD_IDX(rb) - TAIL_IDX(rb)))
+#define IS_FULL(rb) (QUEUED(rb) == (SSM_RBUFF_SIZE - 1))
+#define IS_EMPTY(rb) (HEAD_IDX(rb) == TAIL_IDX(rb))
+
+struct ssm_rbuff {
+ ssize_t * shm_base; /* start of shared memory */
+ size_t * head; /* start of ringbuffer */
+ size_t * tail;
+ size_t * acl; /* access control */
+ pthread_mutex_t * mtx; /* lock for cond vars only */
+ pthread_cond_t * add; /* signal when new data */
+ pthread_cond_t * del; /* signal when data removed */
+ pid_t pid; /* pid of the owner */
+ int flow_id; /* flow_id of the flow */
+};
+
+#define MM_FLAGS (PROT_READ | PROT_WRITE)
+
+static struct ssm_rbuff * rbuff_create(pid_t pid,
+ int flow_id,
+ int flags)
+{
+ struct ssm_rbuff * rb;
+ int fd;
+ ssize_t * shm_base;
+ char fn[FN_MAX_CHARS];
+
+ sprintf(fn, SSM_RBUFF_PREFIX "%d.%d", pid, flow_id);
+
+ rb = malloc(sizeof(*rb));
+ if (rb == NULL)
+ goto fail_malloc;
+
+ fd = shm_open(fn, flags, 0666);
+ if (fd == -1)
+ goto fail_open;
+
+ if ((flags & O_CREAT) && ftruncate(fd, SSM_RBUFF_FILESIZE) < 0)
+ goto fail_truncate;
+
+ shm_base = mmap(NULL, SSM_RBUFF_FILESIZE, MM_FLAGS, MAP_SHARED, fd, 0);
+
+ close(fd);
+
+ rb->shm_base = shm_base;
+ rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE));
+ rb->tail = (size_t *) (rb->head + 1);
+ rb->acl = (size_t *) (rb->tail + 1);
+ rb->mtx = (pthread_mutex_t *) (rb->acl + 1);
+ rb->add = (pthread_cond_t *) (rb->mtx + 1);
+ rb->del = rb->add + 1;
+ rb->pid = pid;
+ rb->flow_id = flow_id;
+
+ return rb;
+
+ fail_truncate:
+ close(fd);
+ if (flags & O_CREAT)
+ shm_unlink(fn);
+ fail_open:
+ free(rb);
+ fail_malloc:
+ return NULL;
+}
+
+static void rbuff_destroy(struct ssm_rbuff * rb)
+{
+ munmap(rb->shm_base, SSM_RBUFF_FILESIZE);
+
+ free(rb);
+}
+
+struct ssm_rbuff * ssm_rbuff_create(pid_t pid,
+ int flow_id)
+{
+ struct ssm_rbuff * rb;
+ pthread_mutexattr_t mattr;
+ pthread_condattr_t cattr;
+ mode_t mask;
+
+ mask = umask(0);
+
+ rb = rbuff_create(pid, flow_id, O_CREAT | O_EXCL | O_RDWR);
+
+ umask(mask);
+
+ if (rb == NULL)
+ goto fail_rb;
+
+ if (pthread_mutexattr_init(&mattr))
+ goto fail_mattr;
+
+ pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
+#ifdef HAVE_ROBUST_MUTEX
+ pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
+#endif
+ if (pthread_mutex_init(rb->mtx, &mattr))
+ goto fail_mutex;
+
+ if (pthread_condattr_init(&cattr))
+ goto fail_cattr;
+
+ pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+#ifndef __APPLE__
+ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
+#endif
+ if (pthread_cond_init(rb->add, &cattr))
+ goto fail_add;
+
+ if (pthread_cond_init(rb->del, &cattr))
+ goto fail_del;
+
+ *rb->acl = ACL_RDWR;
+ *rb->head = 0;
+ *rb->tail = 0;
+
+ rb->pid = pid;
+ rb->flow_id = flow_id;
+
+ pthread_mutexattr_destroy(&mattr);
+ pthread_condattr_destroy(&cattr);
+
+ return rb;
+
+ fail_del:
+ pthread_cond_destroy(rb->add);
+ fail_add:
+ pthread_condattr_destroy(&cattr);
+ fail_cattr:
+ pthread_mutex_destroy(rb->mtx);
+ fail_mutex:
+ pthread_mutexattr_destroy(&mattr);
+ fail_mattr:
+ ssm_rbuff_destroy(rb);
+ fail_rb:
+ return NULL;
+}
+
+void ssm_rbuff_destroy(struct ssm_rbuff * rb)
+{
+ char fn[FN_MAX_CHARS];
+
+ assert(rb != NULL);
+
+ sprintf(fn, SSM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id);
+
+ ssm_rbuff_close(rb);
+
+ shm_unlink(fn);
+}
+
+struct ssm_rbuff * ssm_rbuff_open(pid_t pid,
+ int flow_id)
+{
+ return rbuff_create(pid, flow_id, O_RDWR);
+}
+
+void ssm_rbuff_close(struct ssm_rbuff * rb)
+{
+ assert(rb);
+
+ rbuff_destroy(rb);
+}
+
+int ssm_rbuff_write(struct ssm_rbuff * rb,
+ size_t idx)
+{
+ size_t acl;
+ bool was_empty;
+ int ret = 0;
+
+ assert(rb != NULL);
+
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ if (acl != ACL_RDWR) {
+ if (acl & ACL_FLOWDOWN) {
+ ret = -EFLOWDOWN;
+ goto fail_acl;
+ }
+ if (acl & ACL_RDONLY) {
+ ret = -ENOTALLOC;
+ goto fail_acl;
+ }
+ }
+
+ robust_mutex_lock(rb->mtx);
+
+ if (IS_FULL(rb)) {
+ ret = -EAGAIN;
+ goto fail_mutex;
+ }
+
+ was_empty = IS_EMPTY(rb);
+
+ HEAD(rb) = (ssize_t) idx;
+ ADVANCE_HEAD(rb);
+
+ if (was_empty)
+ pthread_cond_broadcast(rb->add);
+
+ pthread_mutex_unlock(rb->mtx);
+
+ return 0;
+
+ fail_mutex:
+ pthread_mutex_unlock(rb->mtx);
+ fail_acl:
+ return ret;
+}
+
+int ssm_rbuff_write_b(struct ssm_rbuff * rb,
+ size_t idx,
+ const struct timespec * abstime)
+{
+ size_t acl;
+ int ret = 0;
+ bool was_empty;
+
+ assert(rb != NULL);
+
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ if (acl != ACL_RDWR) {
+ if (acl & ACL_FLOWDOWN) {
+ ret = -EFLOWDOWN;
+ goto fail_acl;
+ }
+ if (acl & ACL_RDONLY) {
+ ret = -ENOTALLOC;
+ goto fail_acl;
+ }
+ }
+
+ robust_mutex_lock(rb->mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
+
+ while (IS_FULL(rb) && ret != -ETIMEDOUT) {
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ if (acl & ACL_FLOWDOWN) {
+ ret = -EFLOWDOWN;
+ break;
+ }
+ ret = -robust_wait(rb->del, rb->mtx, abstime);
+ }
+
+ pthread_cleanup_pop(false);
+
+ if (ret != -ETIMEDOUT && ret != -EFLOWDOWN) {
+ was_empty = IS_EMPTY(rb);
+ HEAD(rb) = (ssize_t) idx;
+ ADVANCE_HEAD(rb);
+ if (was_empty)
+ pthread_cond_broadcast(rb->add);
+ }
+
+ pthread_mutex_unlock(rb->mtx);
+
+ fail_acl:
+ return ret;
+}
+
+static int check_rb_acl(struct ssm_rbuff * rb)
+{
+ size_t acl;
+
+ assert(rb != NULL);
+
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+
+ if (acl & ACL_FLOWDOWN)
+ return -EFLOWDOWN;
+
+ if (acl & ACL_FLOWPEER)
+ return -EFLOWPEER;
+
+ return -EAGAIN;
+}
+
+ssize_t ssm_rbuff_read(struct ssm_rbuff * rb)
+{
+ ssize_t ret;
+
+ assert(rb != NULL);
+
+ if (IS_EMPTY(rb))
+ return check_rb_acl(rb);
+
+ robust_mutex_lock(rb->mtx);
+
+ ret = TAIL(rb);
+ ADVANCE_TAIL(rb);
+
+ pthread_cond_broadcast(rb->del);
+
+ pthread_mutex_unlock(rb->mtx);
+
+ return ret;
+}
+
+ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb,
+ const struct timespec * abstime)
+{
+ ssize_t idx = -1;
+ size_t acl;
+
+ assert(rb != NULL);
+
+ acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+ if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN))
+ return -EFLOWDOWN;
+
+ robust_mutex_lock(rb->mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
+
+ while (IS_EMPTY(rb) &&
+ idx != -ETIMEDOUT &&
+ check_rb_acl(rb) == -EAGAIN) {
+ idx = -robust_wait(rb->add, rb->mtx, abstime);
+ }
+
+ pthread_cleanup_pop(false);
+
+ if (!IS_EMPTY(rb)) {
+ idx = TAIL(rb);
+ ADVANCE_TAIL(rb);
+ pthread_cond_broadcast(rb->del);
+ } else if (idx != -ETIMEDOUT) {
+ idx = check_rb_acl(rb);
+ }
+
+ pthread_mutex_unlock(rb->mtx);
+
+ assert(idx != -EAGAIN);
+
+ return idx;
+}
+
+void ssm_rbuff_set_acl(struct ssm_rbuff * rb,
+ uint32_t flags)
+{
+ assert(rb != NULL);
+
+ __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST);
+}
+
+uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb)
+{
+ assert(rb != NULL);
+
+ return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST);
+}
+
+void ssm_rbuff_fini(struct ssm_rbuff * rb)
+{
+ assert(rb != NULL);
+
+ robust_mutex_lock(rb->mtx);
+
+ pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx);
+
+ while (!IS_EMPTY(rb))
+ robust_wait(rb->del, rb->mtx, NULL);
+
+ pthread_cleanup_pop(true);
+}
+
+size_t ssm_rbuff_queued(struct ssm_rbuff * rb)
+{
+ assert(rb != NULL);
+
+ return QUEUED(rb);
+}
+
+int ssm_rbuff_mlock(struct ssm_rbuff * rb)
+{
+ assert(rb != NULL);
+
+ return mlock(rb->shm_base, SSM_RBUFF_FILESIZE);
+}
diff --git a/src/lib/ssm/ssm.h.in b/src/lib/ssm/ssm.h.in
new file mode 100644
index 00000000..d14cd49c
--- /dev/null
+++ b/src/lib/ssm/ssm.h.in
@@ -0,0 +1,146 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2026
+ *
+ * Secure Shared Memory configuration
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_LIB_SSM_H
+#define OUROBOROS_LIB_SSM_H
+
+#include <stddef.h>
+#include <stdint.h>
+#include <stdatomic.h>
+#include <sys/types.h>
+
+/* Pool naming configuration */
+#define SSM_PREFIX "@SSM_PREFIX@"
+#define SSM_GSMP_SUFFIX "@SSM_GSMP_SUFFIX@"
+#define SSM_PPP_SUFFIX "@SSM_PPP_SUFFIX@"
+
+/* Legacy SSM constants */
+#define SSM_RBUFF_PREFIX "@SSM_RBUFF_PREFIX@"
+#define SSM_FLOW_SET_PREFIX "@SSM_FLOW_SET_PREFIX@"
+#define SSM_POOL_NAME "@SSM_POOL_NAME@"
+#define SSM_POOL_BLOCKS @SSM_POOL_BLOCKS@
+#define SSM_RBUFF_SIZE @SSM_RBUFF_SIZE@
+
+/* Packet buffer space reservation */
+#define SSM_PK_BUFF_HEADSPACE @SSM_PK_BUFF_HEADSPACE@
+#define SSM_PK_BUFF_TAILSPACE @SSM_PK_BUFF_TAILSPACE@
+
+/* Pool blocks per size class */
+#define SSM_POOL_256_BLOCKS @SSM_POOL_256_BLOCKS@
+#define SSM_POOL_512_BLOCKS @SSM_POOL_512_BLOCKS@
+#define SSM_POOL_1K_BLOCKS @SSM_POOL_1K_BLOCKS@
+#define SSM_POOL_2K_BLOCKS @SSM_POOL_2K_BLOCKS@
+#define SSM_POOL_4K_BLOCKS @SSM_POOL_4K_BLOCKS@
+#define SSM_POOL_16K_BLOCKS @SSM_POOL_16K_BLOCKS@
+#define SSM_POOL_64K_BLOCKS @SSM_POOL_64K_BLOCKS@
+#define SSM_POOL_256K_BLOCKS @SSM_POOL_256K_BLOCKS@
+#define SSM_POOL_1M_BLOCKS @SSM_POOL_1M_BLOCKS@
+#define SSM_POOL_TOTAL_SIZE @SSM_POOL_TOTAL_SIZE@
+
+/* Size class configuration */
+#define SSM_POOL_MAX_CLASSES 9
+#define SSM_POOL_SHARDS @SSM_POOL_SHARDS@
+
+/* Internal structures - exposed for testing */
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <errno.h>
+#include <pthread.h>
+
+#include <ouroboros/pthread.h>
+
+static __inline__ void robust_mutex_lock(pthread_mutex_t * mtx)
+{
+#ifndef HAVE_ROBUST_MUTEX
+ pthread_mutex_lock(mtx);
+#else
+ if (pthread_mutex_lock(mtx) == EOWNERDEAD)
+ pthread_mutex_consistent(mtx);
+#endif
+}
+
+static __inline__ int robust_wait(pthread_cond_t * cond,
+ pthread_mutex_t * mtx,
+ const struct timespec * abstime)
+{
+ int ret = __timedwait(cond, mtx, abstime);
+#ifdef HAVE_ROBUST_MUTEX
+ if (ret == EOWNERDEAD)
+ pthread_mutex_consistent(mtx);
+#endif
+ return ret;
+}
+
+/* Packet buffer structure used by pool, rbuff, and tests */
+struct ssm_pk_buff {
+ uint32_t next_offset; /* List linkage (pool < 4GB) */
+ uint16_t refcount; /* Reference count (app + rtx) */
+ pid_t allocator_pid; /* For orphan detection */
+ uint32_t size; /* Block size (max 1MB) */
+ uint32_t pk_head; /* Head offset into data */
+ uint32_t pk_tail; /* Tail offset into data */
+ uint32_t off; /* Block offset in pool */
+ uint8_t data[]; /* Packet data */
+};
+
+/* Size class configuration table */
+struct ssm_size_class_cfg {
+ size_t size;
+ size_t blocks;
+};
+
+struct _ssm_list_head {
+ uint32_t head_offset;
+ uint32_t count;
+};
+
+struct _ssm_shard {
+ pthread_mutex_t mtx;
+ pthread_cond_t cond;
+ struct _ssm_list_head free_list;
+ size_t free_count;
+};
+
+struct _ssm_size_class {
+ struct _ssm_shard shards[SSM_POOL_SHARDS];
+ size_t object_size;
+ size_t pool_start;
+ size_t pool_size;
+ size_t object_count;
+};
+
+struct _ssm_pool_hdr {
+ pthread_mutex_t mtx;
+ pthread_cond_t healthy;
+ pid_t pid;
+ uint32_t initialized;
+ void * mapped_addr;
+ struct _ssm_size_class size_classes[SSM_POOL_MAX_CLASSES];
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* OUROBOROS_LIB_SSM_H */
diff --git a/src/lib/ssm/tests/CMakeLists.txt b/src/lib/ssm/tests/CMakeLists.txt
new file mode 100644
index 00000000..827f8bf8
--- /dev/null
+++ b/src/lib/ssm/tests/CMakeLists.txt
@@ -0,0 +1,33 @@
+get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
+get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
+
+compute_test_prefix()
+
+create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
+ # Add new tests here
+ pool_test.c
+ pool_sharding_test.c
+ rbuff_test.c
+ flow_set_test.c
+ )
+
+add_executable(${PARENT_DIR}_test ${${PARENT_DIR}_tests})
+
+disable_test_logging_for_target(${PARENT_DIR}_test)
+target_link_libraries(${PARENT_DIR}_test ouroboros-common)
+
+add_dependencies(build_tests ${PARENT_DIR}_test)
+
+set(tests_to_run ${${PARENT_DIR}_tests})
+if(CMAKE_VERSION VERSION_LESS "3.29.0")
+ remove(tests_to_run test_suite.c)
+else ()
+ list(POP_FRONT tests_to_run)
+endif()
+
+foreach (test ${tests_to_run})
+ get_filename_component(test_name ${test} NAME_WE)
+ add_test(${TEST_PREFIX}/${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name})
+ set_property(TEST ${TEST_PREFIX}/${test_name}
+ PROPERTY ENVIRONMENT "OUROBOROS_TEST_POOL_SUFFIX=.test")
+endforeach (test)
diff --git a/src/lib/ssm/tests/flow_set_test.c b/src/lib/ssm/tests/flow_set_test.c
new file mode 100644
index 00000000..f9084d3c
--- /dev/null
+++ b/src/lib/ssm/tests/flow_set_test.c
@@ -0,0 +1,255 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Test of the SSM flow set
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+#include "ssm.h"
+
+#include <test/test.h>
+#include <ouroboros/ssm_flow_set.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/time.h>
+
+#include <stdio.h>
+#include <unistd.h>
+#include <pthread.h>
+
+static int test_ssm_flow_set_create_destroy(void)
+{
+ struct ssm_flow_set * set;
+ pid_t pid;
+
+ TEST_START();
+
+ pid = getpid();
+
+ set = ssm_flow_set_create(pid);
+ if (set == NULL) {
+ printf("Failed to create flow set.\n");
+ goto fail;
+ }
+
+ ssm_flow_set_destroy(set);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_flow_set_add_del_has(void)
+{
+ struct ssm_flow_set * set;
+ pid_t pid;
+ size_t idx = 0;
+ int flow_id = 42;
+
+ TEST_START();
+
+ pid = getpid();
+
+ set = ssm_flow_set_create(pid);
+ if (set == NULL) {
+ printf("Failed to create flow set.\n");
+ goto fail;
+ }
+
+ if (ssm_flow_set_has(set, idx, flow_id)) {
+ printf("Flow should not be in set initially.\n");
+ goto fail_destroy;
+ }
+
+ if (ssm_flow_set_add(set, idx, flow_id) < 0) {
+ printf("Failed to add flow to set.\n");
+ goto fail_destroy;
+ }
+
+ if (!ssm_flow_set_has(set, idx, flow_id)) {
+ printf("Flow should be in set after add.\n");
+ goto fail_destroy;
+ }
+
+ /* Adding same flow again should fail */
+ if (ssm_flow_set_add(set, idx, flow_id) != -EPERM) {
+ printf("Should not be able to add flow twice.\n");
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_del(set, idx, flow_id);
+
+ if (ssm_flow_set_has(set, idx, flow_id)) {
+ printf("Flow should not be in set after delete.\n");
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_destroy(set);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+fail_destroy:
+ ssm_flow_set_destroy(set);
+fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_flow_set_zero(void)
+{
+ struct ssm_flow_set * set;
+ pid_t pid;
+ size_t idx = 0;
+ int flow_id1 = 10;
+ int flow_id2 = 20;
+
+ TEST_START();
+
+ pid = getpid();
+
+ set = ssm_flow_set_create(pid);
+ if (set == NULL) {
+ printf("Failed to create flow set.\n");
+ goto fail;
+ }
+
+ if (ssm_flow_set_add(set, idx, flow_id1) < 0) {
+ printf("Failed to add flow1 to set.\n");
+ goto fail_destroy;
+ }
+
+ if (ssm_flow_set_add(set, idx, flow_id2) < 0) {
+ printf("Failed to add flow2 to set.\n");
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_zero(set, idx);
+
+ if (ssm_flow_set_has(set, idx, flow_id1)) {
+ printf("Flow1 should not be in set after zero.\n");
+ goto fail_destroy;
+ }
+
+ if (ssm_flow_set_has(set, idx, flow_id2)) {
+ printf("Flow2 should not be in set after zero.\n");
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_destroy(set);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+fail_destroy:
+ ssm_flow_set_destroy(set);
+fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_flow_set_notify_wait(void)
+{
+ struct ssm_flow_set * set;
+ pid_t pid;
+ size_t idx = 0;
+ int flow_id = 100;
+ struct flowevent events[SSM_RBUFF_SIZE];
+ struct timespec timeout;
+ ssize_t ret;
+
+ TEST_START();
+
+ pid = getpid();
+
+ set = ssm_flow_set_create(pid);
+ if (set == NULL) {
+ printf("Failed to create flow set.\n");
+ goto fail;
+ }
+
+ if (ssm_flow_set_add(set, idx, flow_id) < 0) {
+ printf("Failed to add flow to set.\n");
+ goto fail_destroy;
+ }
+
+ /* Test immediate timeout when no events */
+ clock_gettime(PTHREAD_COND_CLOCK, &timeout);
+ ret = ssm_flow_set_wait(set, idx, events, &timeout);
+ if (ret != -ETIMEDOUT) {
+ printf("Wait should timeout immediately when no events.\n");
+ goto fail_destroy;
+ }
+
+ /* Notify an event */
+ ssm_flow_set_notify(set, flow_id, FLOW_PKT);
+
+ /* Should be able to read the event immediately */
+ clock_gettime(PTHREAD_COND_CLOCK, &timeout);
+ ts_add(&timeout, &timeout, &((struct timespec) {1, 0}));
+
+ ret = ssm_flow_set_wait(set, idx, events, &timeout);
+ if (ret != 1) {
+ printf("Wait should return 1 event, got %zd.\n", ret);
+ goto fail_destroy;
+ }
+
+ if (events[0].flow_id != flow_id) {
+ printf("Event flow_id mismatch: expected %d, got %d.\n",
+ flow_id, events[0].flow_id);
+ goto fail_destroy;
+ }
+
+ if (events[0].event != FLOW_PKT) {
+ printf("Event type mismatch: expected %d, got %d.\n",
+ FLOW_PKT, events[0].event);
+ goto fail_destroy;
+ }
+
+ ssm_flow_set_destroy(set);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+fail_destroy:
+ ssm_flow_set_destroy(set);
+fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int flow_set_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ret |= test_ssm_flow_set_create_destroy();
+ ret |= test_ssm_flow_set_add_del_has();
+ ret |= test_ssm_flow_set_zero();
+ ret |= test_ssm_flow_set_notify_wait();
+
+ return ret;
+}
diff --git a/src/lib/ssm/tests/pool_sharding_test.c b/src/lib/ssm/tests/pool_sharding_test.c
new file mode 100644
index 00000000..72ae1cb7
--- /dev/null
+++ b/src/lib/ssm/tests/pool_sharding_test.c
@@ -0,0 +1,505 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2026
+ *
+ * Test of the SSM pool sharding with fallback
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+#include "ssm.h"
+
+#include <test/test.h>
+#include <ouroboros/ssm_pool.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <signal.h>
+
+#define TEST_SIZE 256
+
+/* Helper to get pool header for inspection */
+static struct _ssm_pool_hdr * get_pool_hdr(struct ssm_pool * pool)
+{
+ /* ssm_pool is opaque, but we know its layout:
+ * uint8_t * shm_base
+ * struct _ssm_pool_hdr * hdr
+ * void * pool_base
+ */
+ struct _ssm_pool_hdr ** hdr_ptr =
+ (struct _ssm_pool_hdr **)((uint8_t *)pool + sizeof(void *));
+ return *hdr_ptr;
+}
+
+static int test_lazy_distribution(void)
+{
+ struct ssm_pool * pool;
+ struct _ssm_pool_hdr * hdr;
+ struct _ssm_size_class * sc;
+ int i;
+ int sc_idx;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ hdr = get_pool_hdr(pool);
+ if (hdr == NULL) {
+ printf("Failed to get pool header.\n");
+ goto fail_pool;
+ }
+
+ /* Find the first size class with blocks */
+ sc_idx = -1;
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ if (hdr->size_classes[i].object_count > 0) {
+ sc_idx = i;
+ break;
+ }
+ }
+
+ if (sc_idx < 0) {
+ printf("No size classes configured.\n");
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ printf(" Class %d: count=%zu\n", i,
+ hdr->size_classes[i].object_count);
+ }
+ goto fail_pool;
+ }
+
+ sc = &hdr->size_classes[sc_idx];
+
+ /* Verify all blocks start in shard 0 */
+ if (sc->shards[0].free_count == 0) {
+ printf("Shard 0 should have all blocks initially.\n");
+ goto fail_pool;
+ }
+
+ /* Verify other shards are empty */
+ for (i = 1; i < SSM_POOL_SHARDS; i++) {
+ if (sc->shards[i].free_count != 0) {
+ printf("Shard %d should be empty, has %zu.\n",
+ i, sc->shards[i].free_count);
+ goto fail_pool;
+ }
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_shard_migration(void)
+{
+ struct ssm_pool * pool;
+ struct _ssm_pool_hdr * hdr;
+ struct _ssm_size_class * sc;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t off;
+ int shard_idx;
+ int sc_idx;
+ int i;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ hdr = get_pool_hdr(pool);
+
+ /* Find the first size class with blocks */
+ sc_idx = -1;
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ if (hdr->size_classes[i].object_count > 0) {
+ sc_idx = i;
+ break;
+ }
+ }
+
+ if (sc_idx < 0) {
+ printf("No size classes configured.\n");
+ goto fail;
+ }
+
+ sc = &hdr->size_classes[sc_idx];
+
+ /* Allocate from this process */
+ off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb);
+ if (off < 0) {
+ printf("Allocation failed: %zd.\n", off);
+ goto fail_pool;
+ }
+
+ /* Free it - should go to this process's shard */
+ shard_idx = getpid() % SSM_POOL_SHARDS;
+ if (ssm_pool_remove(pool, off) != 0) {
+ printf("Remove failed.\n");
+ goto fail_pool;
+ }
+
+ /* Verify block migrated away from shard 0 or in allocator's shard */
+ if (sc->shards[shard_idx].free_count == 0 &&
+ sc->shards[0].free_count == 0) {
+ printf("Block should have been freed to a shard.\n");
+ goto fail_pool;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_fallback_stealing(void)
+{
+ struct ssm_pool * pool;
+ struct _ssm_pool_hdr * hdr;
+ struct _ssm_size_class * sc;
+ struct ssm_pk_buff ** spbs;
+ uint8_t ** ptrs;
+ size_t total_blocks;
+ size_t total_free;
+ size_t i;
+ int sc_idx;
+ int c;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ hdr = get_pool_hdr(pool);
+
+ /* Find the first size class with blocks */
+ sc_idx = -1;
+ for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) {
+ if (hdr->size_classes[c].object_count > 0) {
+ sc_idx = c;
+ break;
+ }
+ }
+
+ if (sc_idx < 0) {
+ printf("No size classes configured.\n");
+ goto fail;
+ }
+
+ sc = &hdr->size_classes[sc_idx];
+ total_blocks = sc->object_count;
+
+ spbs = malloc(total_blocks * sizeof(struct ssm_pk_buff *));
+ ptrs = malloc(total_blocks * sizeof(uint8_t *));
+ if (spbs == NULL || ptrs == NULL) {
+ printf("Failed to allocate test arrays.\n");
+ goto fail_pool;
+ }
+
+ /* Allocate half the blocks from single process */
+ for (i = 0; i < total_blocks / 2; i++) {
+ ssize_t off = ssm_pool_alloc(pool, TEST_SIZE,
+ &ptrs[i], &spbs[i]);
+ if (off < 0) {
+ printf("Allocation %zu failed: %zd.\n", i, off);
+ free(spbs);
+ free(ptrs);
+ goto fail_pool;
+ }
+ }
+
+ /* Free them all - they go to local_shard */
+ for (i = 0; i < total_blocks / 2; i++) {
+ size_t off = ssm_pk_buff_get_idx(spbs[i]);
+ if (ssm_pool_remove(pool, off) != 0) {
+ printf("Remove %zu failed.\n", i);
+ free(spbs);
+ free(ptrs);
+ goto fail_pool;
+ }
+ }
+
+ /* Freed blocks should be in shards (all blocks free again) */
+ total_free = 0;
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ total_free += sc->shards[i].free_count;
+ }
+
+ if (total_free != total_blocks) {
+ printf("Expected %zu free blocks total, got %zu.\n",
+ total_blocks, total_free);
+ free(spbs);
+ free(ptrs);
+ goto fail_pool;
+ }
+
+ /* Allocate again - should succeed by taking from shards */
+ for (i = 0; i < total_blocks / 2; i++) {
+ ssize_t off = ssm_pool_alloc(pool, TEST_SIZE,
+ &ptrs[i], &spbs[i]);
+ if (off < 0) {
+ printf("Fallback alloc %zu failed: %zd.\n", i, off);
+ free(spbs);
+ free(ptrs);
+ goto fail_pool;
+ }
+ }
+
+ /* Now all allocated blocks are in use again */
+ /* Cleanup - free all allocated blocks */
+ for (i = 0; i < total_blocks / 2; i++) {
+ size_t off = ssm_pk_buff_get_idx(spbs[i]);
+ ssm_pool_remove(pool, off);
+ }
+
+ free(spbs);
+ free(ptrs);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_multiprocess_sharding(void)
+{
+ struct ssm_pool * pool;
+ struct _ssm_pool_hdr * hdr;
+ struct _ssm_size_class * sc;
+ pid_t children[SSM_POOL_SHARDS];
+ int i;
+ int status;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ /* Fork processes to test different shards */
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ children[i] = fork();
+ if (children[i] == -1) {
+ printf("Fork %d failed.\n", i);
+ goto fail_children;
+ }
+
+ if (children[i] == 0) {
+ /* Child process */
+ struct ssm_pool * child_pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t off;
+ int my_shard;
+
+ child_pool = ssm_pool_open();
+ if (child_pool == NULL)
+ exit(EXIT_FAILURE);
+
+ my_shard = getpid() % SSM_POOL_SHARDS;
+ (void) my_shard; /* Reserved for future use */
+
+ /* Each child allocates and frees a block */
+ off = ssm_pool_alloc(child_pool, TEST_SIZE,
+ &ptr, &spb);
+ if (off < 0) {
+ ssm_pool_close(child_pool);
+ exit(EXIT_FAILURE);
+ }
+
+ /* Small delay to ensure allocation visible */
+ usleep(10000);
+
+ if (ssm_pool_remove(child_pool, off) != 0) {
+ ssm_pool_close(child_pool);
+ exit(EXIT_FAILURE);
+ }
+
+ ssm_pool_close(child_pool);
+ exit(EXIT_SUCCESS);
+ }
+ }
+
+ /* Wait for all children */
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ if (waitpid(children[i], &status, 0) == -1) {
+ printf("Waitpid %d failed.\n", i);
+ goto fail_children;
+ }
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ printf("Child %d failed.\n", i);
+ goto fail_pool;
+ }
+ }
+
+ /* Verify blocks distributed across shards */
+ hdr = get_pool_hdr(pool);
+
+ /* Find the first size class with blocks */
+ sc = NULL;
+ for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) {
+ if (hdr->size_classes[i].object_count > 0) {
+ sc = &hdr->size_classes[i];
+ break;
+ }
+ }
+
+ if (sc == NULL) {
+ printf("No size classes configured.\n");
+ goto fail_pool;
+ }
+
+ /* After children allocate and free, blocks should be in shards
+ * (though exact distribution depends on PID values)
+ */
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ /* At least some shards should have blocks */
+ if (sc->shards[i].free_count > 0) {
+ break;
+ }
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_children:
+ /* Kill any remaining children */
+ for (i = 0; i < SSM_POOL_SHARDS; i++) {
+ if (children[i] > 0)
+ kill(children[i], SIGKILL);
+ }
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_exhaustion_with_fallback(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t off;
+
+ TEST_START();
+
+ ssm_pool_purge();
+
+ pool = ssm_pool_create();
+ if (pool == NULL) {
+ printf("Failed to create pool.\n");
+ goto fail;
+ }
+
+ /* Allocate until exhausted across all shards */
+ while (true) {
+ off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb);
+ if (off < 0) {
+ if (off == -EAGAIN)
+ break;
+ printf("Unexpected error: %zd.\n", off);
+ goto fail_pool;
+ }
+ }
+
+ /* Should fail with -EAGAIN when truly exhausted */
+ off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb);
+ if (off != -EAGAIN) {
+ printf("Expected -EAGAIN, got %zd.\n", off);
+ goto fail_pool;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int pool_sharding_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ret |= test_lazy_distribution();
+ ret |= test_shard_migration();
+ ret |= test_fallback_stealing();
+ ret |= test_multiprocess_sharding();
+ ret |= test_exhaustion_with_fallback();
+
+ return ret;
+}
diff --git a/src/lib/ssm/tests/pool_test.c b/src/lib/ssm/tests/pool_test.c
new file mode 100644
index 00000000..e298d9ab
--- /dev/null
+++ b/src/lib/ssm/tests/pool_test.c
@@ -0,0 +1,1038 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Test of the Secure Shared Memory (SSM) system
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define _POSIX_C_SOURCE 200809L
+
+#include "config.h"
+#include "ssm.h"
+
+#include <test/test.h>
+#include <ouroboros/ssm_pool.h>
+#include <ouroboros/ssm_rbuff.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <stdbool.h>
+#include <stdatomic.h>
+#include <sys/wait.h>
+#include <sys/types.h>
+#include <signal.h>
+#include <time.h>
+
+#define POOL_256 256
+#define POOL_512 512
+#define POOL_1K 1024
+#define POOL_2K 2048
+#define POOL_4K 4096
+#define POOL_16K 16384
+#define POOL_64K 65536
+#define POOL_256K 262144
+#define POOL_1M 1048576
+#define POOL_2M (2 * 1024 * 1024)
+
+static int test_ssm_pool_basic_allocation(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Alloc failed: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ if (spb == NULL) {
+ printf("Spb is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ptr == NULL) {
+ printf("Ptr is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb) != POOL_256) {
+ printf("Bad length: %zu.\n", ssm_pk_buff_len(spb));
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_remove(pool, ret);
+ if (ret != 0) {
+ printf("Remove failed: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_multiple_allocations(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr1;
+ uint8_t * ptr2;
+ uint8_t * ptr3;
+ struct ssm_pk_buff * spb1;
+ struct ssm_pk_buff * spb2;
+ struct ssm_pk_buff * spb3;
+ ssize_t ret1;
+ ssize_t ret2;
+ ssize_t ret3;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1);
+ ret2 = ssm_pool_alloc(pool, POOL_256, &ptr2, &spb2);
+ ret3 = ssm_pool_alloc(pool, POOL_256, &ptr3, &spb3);
+ if (ret1 < 0 || ret2 < 0 || ret3 < 0) {
+ printf("Allocs failed: %zd, %zd, %zd.\n", ret1, ret2, ret3);
+ goto fail_alloc;
+ }
+
+ if (spb1 == NULL) {
+ printf("Spb1 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ptr1 == NULL) {
+ printf("Ptr1 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (spb2 == NULL) {
+ printf("Spb2 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ptr2 == NULL) {
+ printf("Ptr2 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (spb3 == NULL) {
+ printf("Spb3 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ptr3 == NULL) {
+ printf("Ptr3 is NULL.\n");
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb1) != POOL_256) {
+ printf("Bad length spb1: %zu.\n", ssm_pk_buff_len(spb1));
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb2) != POOL_256) {
+ printf("Bad length spb2: %zu.\n", ssm_pk_buff_len(spb2));
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb3) != POOL_256) {
+ printf("Bad length spb3: %zu.\n", ssm_pk_buff_len(spb3));
+ goto fail_alloc;
+ }
+
+ if (ssm_pool_remove(pool, ret2) != 0) {
+ printf("Remove ret2 failed.\n");
+ goto fail_alloc;
+ }
+
+ if (ssm_pool_remove(pool, ret1) != 0) {
+ printf("Remove ret1 failed.\n");
+ goto fail_alloc;
+ }
+
+ if (ssm_pool_remove(pool, ret3) != 0) {
+ printf("Remove ret3 failed.\n");
+ goto fail_alloc;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_no_fallback_for_large(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(pool, POOL_2M, &ptr, &spb);
+ if (ret >= 0) {
+ printf("Oversized alloc succeeded: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ if (ret != -EMSGSIZE) {
+ printf("Wrong error: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_blocking_vs_nonblocking(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(pool, POOL_2M, &ptr, &spb);
+ if (ret != -EMSGSIZE) {
+ printf("Nonblocking oversized: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_alloc_b(pool, POOL_2M, &ptr, &spb, NULL);
+ if (ret != -EMSGSIZE) {
+ printf("Blocking oversized: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Valid alloc failed: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ssm_pool_remove(pool, ret);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_stress_test(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t * indices = NULL;
+ ssize_t ret;
+ size_t count = 0;
+ size_t i;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ indices = malloc(100 * sizeof(*indices));
+ if (indices == NULL) {
+ printf("Malloc failed.\n");
+ goto fail_alloc;
+ }
+
+ for (i = 0; i < 100; i++) {
+ size_t j;
+ size_t num;
+ size_t size;
+
+ num = (i % 100) + 1;
+
+ for (j = 0; j < num && count < 100; j++) {
+ switch (i % 4) {
+ case 0:
+ /* FALLTHRU */
+ case 1:
+ size = POOL_256;
+ break;
+ case 2:
+ /* FALLTHRU */
+ case 3:
+ size = POOL_1K;
+ break;
+ default:
+ size = POOL_256;
+ break;
+ }
+
+ ret = ssm_pool_alloc(pool, size, &ptr, &spb);
+ if (ret < 0) {
+ printf("Alloc at iter %zu: %zd.\n", i, ret);
+ goto fail_test;
+ }
+ indices[count++] = ret;
+ }
+
+ for (j = 0; j < count / 2; j++) {
+ size_t idx = j * 2;
+ if (idx < count) {
+ ret = ssm_pool_remove(pool, indices[idx]);
+ if (ret != 0) {
+ printf("Remove at iter %zu: %zd.\n",
+ i, ret);
+ goto fail_test;
+ }
+ memmove(&indices[idx], &indices[idx + 1],
+ (count - idx - 1) * sizeof(*indices));
+ count--;
+ }
+ }
+
+ if (i % 10 == 0) {
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Periodic alloc at %zu: %zd.\n", i, ret);
+ goto fail_test;
+ }
+ ssm_pool_remove(pool, ret);
+ }
+ }
+
+ for (i = 0; i < count; i++)
+ ssm_pool_remove(pool, indices[i]);
+
+ free(indices);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_test:
+ for (i = 0; i < count; i++)
+ ssm_pool_remove(pool, indices[i]);
+ free(indices);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_open_initializes_ssm(void)
+{
+ struct ssm_pool * creator;
+ struct ssm_pool * opener;
+ uint8_t * ptr;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ creator = ssm_pool_create();
+ if (creator == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(creator, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Creator alloc failed: %zd.\n", ret);
+ goto fail_creator;
+ }
+ ssm_pool_remove(creator, ret);
+
+ opener = ssm_pool_open();
+ if (opener == NULL) {
+ printf("Open failed.\n");
+ goto fail_creator;
+ }
+
+ ret = ssm_pool_alloc(opener, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Opener alloc failed: %zd.\n", ret);
+ goto fail_opener;
+ }
+
+ ssm_pool_remove(opener, ret);
+ ssm_pool_close(opener);
+ ssm_pool_destroy(creator);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_opener:
+ ssm_pool_close(opener);
+ fail_creator:
+ ssm_pool_destroy(creator);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_bounds_checking(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_alloc(pool, POOL_256, NULL, &spb);
+ if (ret < 0) {
+ printf("alloc failed: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ spb = ssm_pool_get(pool, 0);
+ if (spb != NULL) {
+ printf("Get at offset 0.\n");
+ goto fail_alloc;
+ }
+
+ spb = ssm_pool_get(pool, 100000000UL);
+ if (spb != NULL) {
+ printf("Get beyond pool.\n");
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_remove(pool, 0);
+ if (ret != -EINVAL) {
+ printf("Remove at offset 0: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ret = ssm_pool_remove(pool, 100000000UL);
+ if (ret != -EINVAL) {
+ printf("Remove beyond pool: %zd.\n", ret);
+ goto fail_alloc;
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_inter_process_communication(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_rbuff * rb;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ uint8_t * data;
+ const char * msg = "inter-process test";
+ size_t len;
+ ssize_t idx;
+ pid_t pid;
+ int status;
+
+ TEST_START();
+
+ len = strlen(msg) + 1;
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ rb = ssm_rbuff_create(getpid(), 1);
+ if (rb == NULL) {
+ printf("Rbuff create failed.\n");
+ goto fail_pool;
+ }
+
+ pid = fork();
+ if (pid < 0) {
+ printf("Fork failed.\n");
+ goto fail_rbuff;
+ }
+
+ if (pid == 0) {
+ idx = ssm_rbuff_read_b(rb, NULL);
+ if (idx < 0) {
+ printf("Child: rbuff read: %zd.\n", idx);
+ exit(1);
+ }
+
+ spb = ssm_pool_get(pool, idx);
+ if (spb == NULL) {
+ printf("Child: pool get failed.\n");
+ exit(1);
+ }
+
+ data = ssm_pk_buff_head(spb);
+ if (data == NULL) {
+ printf("Child: data is NULL.\n");
+ ssm_pool_remove(pool, idx);
+ exit(1);
+ }
+
+ if (strcmp((char *)data, msg) != 0) {
+ printf("Child: data mismatch.\n");
+ ssm_pool_remove(pool, idx);
+ exit(1);
+ }
+
+ ssm_pool_remove(pool, idx);
+ exit(0);
+ }
+
+ idx = ssm_pool_alloc(pool, len, &ptr, &spb);
+ if (idx < 0) {
+ printf("Parent: pool alloc: %zd.\n", idx);
+ goto fail_child;
+ }
+
+ memcpy(ptr, msg, len);
+
+ if (ssm_rbuff_write(rb, idx) < 0) {
+ printf("Parent: rbuff write failed.\n");
+ ssm_pool_remove(pool, idx);
+ goto fail_child;
+ }
+
+ if (waitpid(pid, &status, 0) < 0) {
+ printf("Parent: waitpid failed.\n");
+ ssm_pool_remove(pool, idx);
+ goto fail_rbuff;
+ }
+
+ if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) {
+ printf("Child failed.\n");
+ ssm_pool_remove(pool, idx);
+ goto fail_rbuff;
+ }
+
+ ssm_rbuff_destroy(rb);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_child:
+ waitpid(pid, &status, 0);
+ fail_rbuff:
+ ssm_rbuff_destroy(rb);
+ fail_pool:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_read_operation(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * wptr;
+ uint8_t * rptr;
+ const char * data = "ssm_pool_read test";
+ size_t len;
+ ssize_t idx;
+ ssize_t ret;
+
+ TEST_START();
+
+ len = strlen(data) + 1;
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ idx = ssm_pool_alloc(pool, len, &wptr, &spb);
+ if (idx < 0) {
+ printf("alloc failed: %zd.\n", idx);
+ goto fail_alloc;
+ }
+
+ memcpy(wptr, data, len);
+
+ ret = ssm_pool_read(&rptr, pool, idx);
+ if (ret < 0) {
+ printf("Read failed: %zd.\n", ret);
+ goto fail_read;
+ }
+
+ if (rptr == NULL) {
+ printf("NULL pointer.\n");
+ goto fail_read;
+ }
+
+ if (strcmp((char *)rptr, data) != 0) {
+ printf("Data mismatch.\n");
+ goto fail_read;
+ }
+
+ ssm_pool_remove(pool, idx);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_read:
+ ssm_pool_remove(pool, idx);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_mlock_operation(void)
+{
+ struct ssm_pool * pool;
+ int ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ ret = ssm_pool_mlock(pool);
+ if (ret < 0)
+ printf("Mlock failed: %d (may need privileges).\n", ret);
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pk_buff_operations(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ uint8_t * head;
+ uint8_t * tail;
+ const char * data = "packet buffer test";
+ size_t dlen;
+ size_t len;
+ ssize_t idx;
+
+ TEST_START();
+
+ dlen = strlen(data);
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ idx = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (idx < 0) {
+ printf("alloc failed: %zd.\n", idx);
+ goto fail_alloc;
+ }
+
+ head = ssm_pk_buff_head(spb);
+ if (head != ptr) {
+ printf("Head mismatch.\n");
+ goto fail_ops;
+ }
+
+ len = ssm_pk_buff_len(spb);
+ if (len != POOL_256) {
+ printf("Bad length: %zu.\n", len);
+ goto fail_ops;
+ }
+
+ tail = ssm_pk_buff_tail(spb);
+ if (tail != ptr + len) {
+ printf("Tail mismatch.\n");
+ goto fail_ops;
+ }
+
+ memcpy(head, data, dlen);
+
+ tail = ssm_pk_buff_tail_alloc(spb, 32);
+ if (tail == NULL) {
+ printf("Tail_alloc failed.\n");
+ goto fail_ops;
+ }
+
+ if (ssm_pk_buff_len(spb) != POOL_256 + 32) {
+ printf("Length after tail_alloc: %zu.\n",
+ ssm_pk_buff_len(spb));
+ goto fail_ops;
+ }
+
+ if (memcmp(head, data, dlen) != 0) {
+ printf("Data corrupted.\n");
+ goto fail_ops;
+ }
+
+ tail = ssm_pk_buff_tail_release(spb, 32);
+ if (tail == NULL) {
+ printf("Tail_release failed.\n");
+ goto fail_ops;
+ }
+
+ if (ssm_pk_buff_len(spb) != POOL_256) {
+ printf("Length after tail_release: %zu.\n",
+ ssm_pk_buff_len(spb));
+ goto fail_ops;
+ }
+
+ ssm_pool_remove(pool, idx);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_ops:
+ ssm_pool_remove(pool, idx);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+#define OVERHEAD (offsetof(struct ssm_pk_buff, data) + \
+ SSM_PK_BUFF_HEADSPACE + SSM_PK_BUFF_TAILSPACE)
+static int test_ssm_pool_size_class_boundaries(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ size_t sizes[] = {
+ 1,
+ POOL_512 - OVERHEAD,
+ POOL_512 - OVERHEAD + 1,
+ POOL_1K - OVERHEAD,
+ POOL_1K - OVERHEAD + 1,
+ POOL_2K - OVERHEAD,
+ POOL_2K - OVERHEAD + 1,
+ POOL_4K - OVERHEAD,
+ POOL_4K - OVERHEAD + 1,
+ POOL_16K - OVERHEAD,
+ POOL_16K - OVERHEAD + 1,
+ POOL_64K - OVERHEAD,
+ POOL_64K - OVERHEAD + 1,
+ POOL_256K - OVERHEAD,
+ POOL_256K - OVERHEAD + 1,
+ POOL_1M - OVERHEAD,
+ };
+ size_t expected_classes[] = {
+ 512, 512, 1024, 1024, 2048, 2048, 4096, 4096, 16384,
+ 16384, 65536, 65536, 262144, 262144, 1048576, 1048576
+ };
+ size_t i;
+ ssize_t idx;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ for (i = 0; i < sizeof(sizes) / sizeof(sizes[0]); i++) {
+ struct ssm_pk_buff * hdr;
+ size_t actual_class;
+
+ idx = ssm_pool_alloc(pool, sizes[i], &ptr, &spb);
+ if (idx < 0) {
+ printf("Alloc at %zu failed: %zd.\n", sizes[i], idx);
+ goto fail_alloc;
+ }
+
+ if (ssm_pk_buff_len(spb) != sizes[i]) {
+ printf("Length mismatch at %zu: %zu.\n",
+ sizes[i], ssm_pk_buff_len(spb));
+ ssm_pool_remove(pool, idx);
+ goto fail_alloc;
+ }
+
+ /* Verify correct size class was used
+ * hdr->size is the data array size (object_size - header) */
+ hdr = spb;
+ actual_class = hdr->size + offsetof(struct ssm_pk_buff, data);
+ if (actual_class != expected_classes[i]) {
+ printf("Wrong class for len=%zu: want %zu, got %zu.\n",
+ sizes[i], expected_classes[i], actual_class);
+ ssm_pool_remove(pool, idx);
+ goto fail_alloc;
+ }
+
+ memset(ptr, i & 0xFF, sizes[i]);
+
+ ssm_pool_remove(pool, idx);
+ }
+
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_exhaustion(void)
+{
+ struct ssm_pool * pool;
+ struct ssm_pk_buff * spb;
+ uint8_t * ptr;
+ ssize_t * indices;
+ size_t count = 0;
+ size_t i;
+ ssize_t ret;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ indices = malloc(2048 * sizeof(*indices));
+ if (indices == NULL) {
+ printf("Malloc failed.\n");
+ goto fail_alloc;
+ }
+
+ for (i = 0; i < 2048; i++) {
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ if (ret == -EAGAIN)
+ break;
+ printf("Alloc error: %zd.\n", ret);
+ goto fail_test;
+ }
+ indices[count++] = ret;
+ }
+
+ if (count == 0) {
+ printf("No allocs succeeded.\n");
+ goto fail_test;
+ }
+
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret >= 0) {
+ ssm_pool_remove(pool, ret);
+ } else if (ret != -EAGAIN) {
+ printf("Unexpected error: %zd.\n", ret);
+ goto fail_test;
+ }
+
+ for (i = 0; i < count; i++)
+ ssm_pool_remove(pool, indices[i]);
+
+ ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb);
+ if (ret < 0) {
+ printf("Alloc after free failed: %zd.\n", ret);
+ goto fail_test;
+ }
+ ssm_pool_remove(pool, ret);
+
+ free(indices);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_test:
+ for (i = 0; i < count; i++)
+ ssm_pool_remove(pool, indices[i]);
+ free(indices);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_pool_reclaim_orphans(void)
+{
+ struct ssm_pool * pool;
+ uint8_t * ptr1;
+ uint8_t * ptr2;
+ uint8_t * ptr3;
+ struct ssm_pk_buff * spb1;
+ struct ssm_pk_buff * spb2;
+ struct ssm_pk_buff * spb3;
+ ssize_t ret1;
+ ssize_t ret2;
+ ssize_t ret3;
+ pid_t my_pid;
+ pid_t fake_pid = 99999;
+
+ TEST_START();
+
+ pool = ssm_pool_create();
+ if (pool == NULL)
+ goto fail_create;
+
+ my_pid = getpid();
+
+ /* Allocate some blocks */
+ ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1);
+ ret2 = ssm_pool_alloc(pool, POOL_512, &ptr2, &spb2);
+ ret3 = ssm_pool_alloc(pool, POOL_1K, &ptr3, &spb3);
+ if (ret1 < 0 || ret2 < 0 || ret3 < 0) {
+ printf("Allocs failed: %zd, %zd, %zd.\n", ret1, ret2, ret3);
+ goto fail_alloc;
+ }
+
+ /* Simulate blocks from another process by changing allocator_pid */
+ spb1->allocator_pid = fake_pid;
+ spb2->allocator_pid = fake_pid;
+ /* Keep spb3 with our pid */
+
+ /* Reclaim orphans from fake_pid */
+ ssm_pool_reclaim_orphans(pool, fake_pid);
+
+ /* Verify spb1 and spb2 have refcount 0 (reclaimed) */
+ if (spb1->refcount != 0) {
+ printf("spb1 refcount should be 0, got %u.\n", spb1->refcount);
+ goto fail_test;
+ }
+
+ if (spb2->refcount != 0) {
+ printf("spb2 refcount should be 0, got %u.\n", spb2->refcount);
+ goto fail_test;
+ }
+
+ /* Verify spb3 still has refcount 1 (not reclaimed) */
+ if (spb3->refcount != 1) {
+ printf("spb3 refcount should be 1, got %u.\n", spb3->refcount);
+ goto fail_test;
+ }
+
+ /* Clean up */
+ ssm_pool_remove(pool, ret3);
+
+ /* Try allocating again - should get blocks from reclaimed pool */
+ ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1);
+ if (ret1 < 0) {
+ printf("Alloc after reclaim failed: %zd.\n", ret1);
+ goto fail_test;
+ }
+
+ /* Verify new allocation has our pid */
+ if (spb1->allocator_pid != my_pid) {
+ printf("New block has wrong pid: %d vs %d.\n",
+ spb1->allocator_pid, my_pid);
+ goto fail_test;
+ }
+
+ ssm_pool_remove(pool, ret1);
+ ssm_pool_destroy(pool);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_test:
+ ssm_pool_remove(pool, ret3);
+ fail_alloc:
+ ssm_pool_destroy(pool);
+ fail_create:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int pool_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ssm_pool_purge();
+
+ ret |= test_ssm_pool_basic_allocation();
+ ret |= test_ssm_pool_multiple_allocations();
+ ret |= test_ssm_pool_no_fallback_for_large();
+ ret |= test_ssm_pool_blocking_vs_nonblocking();
+ ret |= test_ssm_pool_stress_test();
+ ret |= test_ssm_pool_open_initializes_ssm();
+ ret |= test_ssm_pool_bounds_checking();
+ ret |= test_ssm_pool_inter_process_communication();
+ ret |= test_ssm_pool_read_operation();
+ ret |= test_ssm_pool_mlock_operation();
+ ret |= test_ssm_pk_buff_operations();
+ ret |= test_ssm_pool_size_class_boundaries();
+ ret |= test_ssm_pool_exhaustion();
+ ret |= test_ssm_pool_reclaim_orphans();
+
+ return ret;
+}
diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c
new file mode 100644
index 00000000..6e1cb5ec
--- /dev/null
+++ b/src/lib/ssm/tests/rbuff_test.c
@@ -0,0 +1,675 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2024
+ *
+ * Test of the SSM notification ring buffer
+ *
+ * Dimitri Staessens <dimitri@ouroboros.rocks>
+ * Sander Vrijders <sander@ouroboros.rocks>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200112L
+#endif
+
+#include "config.h"
+#include "ssm.h"
+
+#include <test/test.h>
+#include <ouroboros/ssm_rbuff.h>
+#include <ouroboros/errno.h>
+#include <ouroboros/time.h>
+
+#include <errno.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <pthread.h>
+
+static int test_ssm_rbuff_create_destroy(void)
+{
+ struct ssm_rbuff * rb;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 1);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_write_read(void)
+{
+ struct ssm_rbuff * rb;
+ ssize_t idx;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 2);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ if (ssm_rbuff_write(rb, 42) < 0) {
+ printf("Failed to write value.\n");
+ goto fail_rb;
+ }
+
+ if (ssm_rbuff_queued(rb) != 1) {
+ printf("Queue length should be 1, got %zu.\n",
+ ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+
+ idx = ssm_rbuff_read(rb);
+ if (idx != 42) {
+ printf("Expected 42, got %zd.\n", idx);
+ goto fail_rb;
+ }
+
+ if (ssm_rbuff_queued(rb) != 0) {
+ printf("Queue should be empty, got %zu.\n",
+ ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_read_empty(void)
+{
+ struct ssm_rbuff * rb;
+ ssize_t ret;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 3);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ ret = ssm_rbuff_read(rb);
+ if (ret != -EAGAIN) {
+ printf("Expected -EAGAIN, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_fill_drain(void)
+{
+ struct ssm_rbuff * rb;
+ size_t i;
+ ssize_t ret;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 4);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
+ if (ssm_rbuff_queued(rb) != i) {
+ printf("Expected %zu queued, got %zu.\n",
+ i, ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+ if (ssm_rbuff_write(rb, i) < 0) {
+ printf("Failed to write at index %zu.\n", i);
+ goto fail_rb;
+ }
+ }
+
+ if (ssm_rbuff_queued(rb) != SSM_RBUFF_SIZE - 1) {
+ printf("Expected %d queued, got %zu.\n",
+ SSM_RBUFF_SIZE - 1, ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+
+ ret = ssm_rbuff_write(rb, 999);
+ if (ret != -EAGAIN) {
+ printf("Expected -EAGAIN on full buffer, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
+ ret = ssm_rbuff_read(rb);
+ if (ret != (ssize_t) i) {
+ printf("Expected %zu, got %zd.\n", i, ret);
+ goto fail_rb;
+ }
+ }
+
+ if (ssm_rbuff_queued(rb) != 0) {
+ printf("Expected empty queue, got %zu.\n",
+ ssm_rbuff_queued(rb));
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_acl(void)
+{
+ struct ssm_rbuff * rb;
+ uint32_t acl;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 5);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ acl = ssm_rbuff_get_acl(rb);
+ if (acl != ACL_RDWR) {
+ printf("Expected ACL_RDWR, got %u.\n", acl);
+ goto fail_rb;
+ }
+
+ ssm_rbuff_set_acl(rb, ACL_RDONLY);
+ acl = ssm_rbuff_get_acl(rb);
+ if (acl != ACL_RDONLY) {
+ printf("Expected ACL_RDONLY, got %u.\n", acl);
+ goto fail_rb;
+ }
+
+ if (ssm_rbuff_write(rb, 1) != -ENOTALLOC) {
+ printf("Expected -ENOTALLOC on RDONLY.\n");
+ goto fail_rb;
+ }
+
+ ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+ if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) {
+ printf("Expected -EFLOWDOWN on FLOWDOWN.\n");
+ goto fail_rb;
+ }
+
+ if (ssm_rbuff_read(rb) != -EFLOWDOWN) {
+ printf("Expected -EFLOWDOWN on read with FLOWDOWN.\n");
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_open_close(void)
+{
+ struct ssm_rbuff * rb1;
+ struct ssm_rbuff * rb2;
+ pid_t pid;
+
+ TEST_START();
+
+ pid = getpid();
+
+ rb1 = ssm_rbuff_create(pid, 6);
+ if (rb1 == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ if (ssm_rbuff_write(rb1, 123) < 0) {
+ printf("Failed to write value.\n");
+ goto fail_rb1;
+ }
+
+ rb2 = ssm_rbuff_open(pid, 6);
+ if (rb2 == NULL) {
+ printf("Failed to open existing rbuff.\n");
+ goto fail_rb1;
+ }
+
+ if (ssm_rbuff_queued(rb2) != 1) {
+ printf("Expected 1 queued in opened rbuff, got %zu.\n",
+ ssm_rbuff_queued(rb2));
+ goto fail_rb2;
+ }
+
+ if (ssm_rbuff_read(rb2) != 123) {
+ printf("Failed to read from opened rbuff.\n");
+ goto fail_rb2;
+ }
+
+ ssm_rbuff_close(rb2);
+ ssm_rbuff_destroy(rb1);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb2:
+ ssm_rbuff_close(rb2);
+ fail_rb1:
+ ssm_rbuff_destroy(rb1);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+struct thread_args {
+ struct ssm_rbuff * rb;
+ int iterations;
+ int delay_us;
+};
+
+static void * writer_thread(void * arg)
+{
+ struct thread_args * args = (struct thread_args *) arg;
+ struct timespec delay = {0, 0};
+ int i;
+
+ delay.tv_nsec = args->delay_us * 1000L;
+
+ for (i = 0; i < args->iterations; ++i) {
+ while (ssm_rbuff_write(args->rb, i) < 0)
+ nanosleep(&delay, NULL);
+ }
+
+ return NULL;
+}
+
+static void * reader_thread(void * arg)
+{
+ struct thread_args * args = (struct thread_args *) arg;
+ struct timespec delay = {0, 0};
+ int i;
+ ssize_t val;
+
+ delay.tv_nsec = args->delay_us * 1000L;
+
+ for (i = 0; i < args->iterations; ++i) {
+ val = ssm_rbuff_read(args->rb);
+ while (val < 0) {
+ nanosleep(&delay, NULL);
+ val = ssm_rbuff_read(args->rb);
+ }
+ if (val != i) {
+ printf("Expected %d, got %zd.\n", i, val);
+ return (void *) -1;
+ }
+ }
+
+ return NULL;
+}
+
+static void * blocking_writer_thread(void * arg)
+{
+ struct thread_args * args = (struct thread_args *) arg;
+ int i;
+
+ for (i = 0; i < args->iterations; ++i) {
+ if (ssm_rbuff_write_b(args->rb, i, NULL) < 0)
+ return (void *) -1;
+ }
+
+ return NULL;
+}
+
+static void * blocking_reader_thread(void * arg)
+{
+ struct thread_args * args = (struct thread_args *) arg;
+ int i;
+ ssize_t val;
+
+ for (i = 0; i < args->iterations; ++i) {
+ val = ssm_rbuff_read_b(args->rb, NULL);
+ if (val < 0 || val != i) {
+ printf("Expected %d, got %zd.\n", i, val);
+ return (void *) -1;
+ }
+ }
+
+ return NULL;
+}
+
+static int test_ssm_rbuff_blocking(void)
+{
+ struct ssm_rbuff * rb;
+ pthread_t wthread;
+ pthread_t rthread;
+ struct thread_args args;
+ struct timespec delay = {0, 10 * MILLION};
+ void * ret_w;
+ void * ret_r;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 8);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ args.rb = rb;
+ args.iterations = 50;
+ args.delay_us = 0;
+
+ if (pthread_create(&rthread, NULL, blocking_reader_thread, &args)) {
+ printf("Failed to create reader thread.\n");
+ goto fail_rthread;
+ }
+
+ nanosleep(&delay, NULL);
+
+ if (pthread_create(&wthread, NULL, blocking_writer_thread, &args)) {
+ printf("Failed to create writer thread.\n");
+ pthread_cancel(rthread);
+ goto fail_wthread;
+ }
+
+ pthread_join(wthread, &ret_w);
+ pthread_join(rthread, &ret_r);
+
+ if (ret_w != NULL || ret_r != NULL) {
+ printf("Thread returned error.\n");
+ goto fail_ret;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_ret:
+ fail_wthread:
+ pthread_join(rthread, NULL);
+ fail_rthread:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_blocking_timeout(void)
+{
+ struct ssm_rbuff * rb;
+ struct timespec abs_timeout;
+ struct timespec interval = {0, 100 * MILLION};
+ struct timespec start;
+ struct timespec end;
+ ssize_t ret;
+ long elapsed_ms;
+ size_t i;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 9);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &start);
+ ts_add(&start, &interval, &abs_timeout);
+
+ ret = ssm_rbuff_read_b(rb, &abs_timeout);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &end);
+
+ if (ret != -ETIMEDOUT) {
+ printf("Expected -ETIMEDOUT, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ elapsed_ms = (end.tv_sec - start.tv_sec) * 1000L +
+ (end.tv_nsec - start.tv_nsec) / 1000000L;
+
+ if (elapsed_ms < 90 || elapsed_ms > 200) {
+ printf("Timeout took %ld ms, expected ~100 ms.\n",
+ elapsed_ms);
+ goto fail_rb;
+ }
+
+ for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
+ if (ssm_rbuff_write(rb, i) < 0) {
+ printf("Failed to fill buffer.\n");
+ goto fail_rb;
+ }
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &start);
+ ts_add(&start, &interval, &abs_timeout);
+
+ ret = ssm_rbuff_write_b(rb, 999, &abs_timeout);
+
+ clock_gettime(PTHREAD_COND_CLOCK, &end);
+
+ if (ret != -ETIMEDOUT) {
+ printf("Expected -ETIMEDOUT on full buffer, got %zd.\n",
+ ret);
+ goto fail_rb;
+ }
+
+ elapsed_ms = (end.tv_sec - start.tv_sec) * 1000L +
+ (end.tv_nsec - start.tv_nsec) / 1000000L;
+
+ if (elapsed_ms < 90 || elapsed_ms > 200) {
+ printf("Write timeout took %ld ms, expected ~100 ms.\n",
+ elapsed_ms);
+ goto fail_rb;
+ }
+
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_blocking_flowdown(void)
+{
+ struct ssm_rbuff * rb;
+ struct timespec abs_timeout;
+ struct timespec now;
+ struct timespec interval = {5, 0};
+ ssize_t ret;
+ size_t i;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 10);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, &interval, &abs_timeout);
+
+ ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+
+ ret = ssm_rbuff_read_b(rb, &abs_timeout);
+ if (ret != -EFLOWDOWN) {
+ printf("Expected -EFLOWDOWN, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ ssm_rbuff_set_acl(rb, ACL_RDWR);
+
+ for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) {
+ if (ssm_rbuff_write(rb, i) < 0) {
+ printf("Failed to fill buffer.\n");
+ goto fail_rb;
+ }
+ }
+
+ clock_gettime(PTHREAD_COND_CLOCK, &now);
+ ts_add(&now, &interval, &abs_timeout);
+
+ ssm_rbuff_set_acl(rb, ACL_FLOWDOWN);
+
+ ret = ssm_rbuff_write_b(rb, 999, &abs_timeout);
+ if (ret != -EFLOWDOWN) {
+ printf("Expected -EFLOWDOWN on write, got %zd.\n", ret);
+ goto fail_rb;
+ }
+
+ ssm_rbuff_set_acl(rb, ACL_RDWR);
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ while (ssm_rbuff_read(rb) >= 0)
+ ;
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+static int test_ssm_rbuff_threaded(void)
+{
+ struct ssm_rbuff * rb;
+ pthread_t wthread;
+ pthread_t rthread;
+ struct thread_args args;
+ void * ret_w;
+ void * ret_r;
+
+ TEST_START();
+
+ rb = ssm_rbuff_create(getpid(), 7);
+ if (rb == NULL) {
+ printf("Failed to create rbuff.\n");
+ goto fail;
+ }
+
+ args.rb = rb;
+ args.iterations = 100;
+ args.delay_us = 100;
+
+ if (pthread_create(&wthread, NULL, writer_thread, &args)) {
+ printf("Failed to create writer thread.\n");
+ goto fail_rb;
+ }
+
+ if (pthread_create(&rthread, NULL, reader_thread, &args)) {
+ printf("Failed to create reader thread.\n");
+ pthread_cancel(wthread);
+ pthread_join(wthread, NULL);
+ goto fail_rb;
+ }
+
+ pthread_join(wthread, &ret_w);
+ pthread_join(rthread, &ret_r);
+
+ if (ret_w != NULL || ret_r != NULL) {
+ printf("Thread returned error.\n");
+ goto fail_rb;
+ }
+
+ ssm_rbuff_destroy(rb);
+
+ TEST_SUCCESS();
+ return TEST_RC_SUCCESS;
+
+ fail_rb:
+ ssm_rbuff_destroy(rb);
+ fail:
+ TEST_FAIL();
+ return TEST_RC_FAIL;
+}
+
+int rbuff_test(int argc,
+ char ** argv)
+{
+ int ret = 0;
+
+ (void) argc;
+ (void) argv;
+
+ ret |= test_ssm_rbuff_create_destroy();
+ ret |= test_ssm_rbuff_write_read();
+ ret |= test_ssm_rbuff_read_empty();
+ ret |= test_ssm_rbuff_fill_drain();
+ ret |= test_ssm_rbuff_acl();
+ ret |= test_ssm_rbuff_open_close();
+ ret |= test_ssm_rbuff_threaded();
+ ret |= test_ssm_rbuff_blocking();
+ ret |= test_ssm_rbuff_blocking_timeout();
+ ret |= test_ssm_rbuff_blocking_flowdown();
+
+ return ret;
+}