diff options
Diffstat (limited to 'src/lib/ssm')
| -rw-r--r-- | src/lib/ssm/flow_set.c | 372 | ||||
| -rw-r--r-- | src/lib/ssm/pool.c | 882 | ||||
| -rw-r--r-- | src/lib/ssm/rbuff.c | 449 | ||||
| -rw-r--r-- | src/lib/ssm/ssm.h.in | 146 | ||||
| -rw-r--r-- | src/lib/ssm/tests/CMakeLists.txt | 33 | ||||
| -rw-r--r-- | src/lib/ssm/tests/flow_set_test.c | 255 | ||||
| -rw-r--r-- | src/lib/ssm/tests/pool_sharding_test.c | 505 | ||||
| -rw-r--r-- | src/lib/ssm/tests/pool_test.c | 1038 | ||||
| -rw-r--r-- | src/lib/ssm/tests/rbuff_test.c | 675 |
9 files changed, 4355 insertions, 0 deletions
diff --git a/src/lib/ssm/flow_set.c b/src/lib/ssm/flow_set.c new file mode 100644 index 00000000..ab24d357 --- /dev/null +++ b/src/lib/ssm/flow_set.c @@ -0,0 +1,372 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Management of flow_sets for fqueue + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200809L + +#include "config.h" +#include "ssm.h" + +#include <ouroboros/errno.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/pthread.h> +#include <ouroboros/ssm_flow_set.h> +#include <ouroboros/time.h> + +#include <assert.h> +#include <fcntl.h> +#include <signal.h> +#include <stdlib.h> +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <sys/mman.h> +#include <sys/stat.h> + +/* + * pthread_cond_timedwait has a WONTFIX bug as of glibc 2.25 where it + * doesn't test pthread cancellation when passed an expired timeout + * with the clock set to CLOCK_MONOTONIC. + */ +#if ((defined(__linux__) || (defined(__MACH__) && !defined(__APPLE__))) \ + && (defined(__GLIBC__) && ((__GLIBC__ * 1000 + __GLIBC_MINOR__) >= 2025)) \ + && (PTHREAD_COND_CLOCK == CLOCK_MONOTONIC)) +#define HAVE_CANCEL_BUG +#endif + +#define FN_MAX_CHARS 255 +#define FS_PROT (PROT_READ | PROT_WRITE) + +#define QUEUESIZE ((SSM_RBUFF_SIZE) * sizeof(struct flowevent)) + +#define SSM_FSET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t) \ + + PROG_MAX_FQUEUES * sizeof(size_t) \ + + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \ + + PROG_MAX_FQUEUES * QUEUESIZE \ + + sizeof(pthread_mutex_t)) + +#define fqueue_ptr(fs, idx) (fs->fqueues + (SSM_RBUFF_SIZE) * idx) + +struct ssm_flow_set { + ssize_t * mtable; + size_t * heads; + pthread_cond_t * conds; + struct flowevent * fqueues; + pthread_mutex_t * lock; + + pid_t pid; +}; + +static struct ssm_flow_set * flow_set_create(pid_t pid, + int oflags) +{ + struct ssm_flow_set * set; + ssize_t * shm_base; + char fn[FN_MAX_CHARS]; + int fd; + + sprintf(fn, SSM_FLOW_SET_PREFIX "%d", pid); + + set = malloc(sizeof(*set)); + if (set == NULL) + goto fail_malloc; + + fd = shm_open(fn, oflags, 0666); + if (fd == -1) + goto fail_shm_open; + + if ((oflags & O_CREAT) && ftruncate(fd, SSM_FSET_FILE_SIZE) < 0) + goto fail_truncate; + + shm_base = mmap(NULL, SSM_FSET_FILE_SIZE, FS_PROT, MAP_SHARED, fd, 0); + if (shm_base == MAP_FAILED) + goto fail_mmap; + + close(fd); + + set->mtable = shm_base; + set->heads = (size_t *) (set->mtable + SYS_MAX_FLOWS); + set->conds = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES); + set->fqueues = (struct flowevent *) (set->conds + PROG_MAX_FQUEUES); + set->lock = (pthread_mutex_t *) + (set->fqueues + PROG_MAX_FQUEUES * (SSM_RBUFF_SIZE)); + + return set; + + fail_mmap: + if (oflags & O_CREAT) + shm_unlink(fn); + fail_truncate: + close(fd); + fail_shm_open: + free(set); + fail_malloc: + return NULL; +} + +struct ssm_flow_set * ssm_flow_set_create(pid_t pid) +{ + struct ssm_flow_set * set; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + mode_t mask; + int i; + + mask = umask(0); + + set = flow_set_create(pid, O_CREAT | O_RDWR); + + umask(mask); + + if (set == NULL) + goto fail_set; + + set->pid = pid; + + if (pthread_mutexattr_init(&mattr)) + goto fail_mutexattr_init; + +#ifdef HAVE_ROBUST_MUTEX + if (pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST)) + goto fail_mattr_set; +#endif + if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED)) + goto fail_mattr_set; + + if (pthread_mutex_init(set->lock, &mattr)) + goto fail_mattr_set; + + if (pthread_condattr_init(&cattr)) + goto fail_condattr_init; + + if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED)) + goto fail_condattr_set; + +#ifndef __APPLE__ + if (pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK)) + goto fail_condattr_set; +#endif + for (i = 0; i < PROG_MAX_FQUEUES; ++i) { + set->heads[i] = 0; + if (pthread_cond_init(&set->conds[i], &cattr)) + goto fail_init; + } + + for (i = 0; i < SYS_MAX_FLOWS; ++i) + set->mtable[i] = -1; + + return set; + + fail_init: + while (i-- > 0) + pthread_cond_destroy(&set->conds[i]); + fail_condattr_set: + pthread_condattr_destroy(&cattr); + fail_condattr_init: + pthread_mutex_destroy(set->lock); + fail_mattr_set: + pthread_mutexattr_destroy(&mattr); + fail_mutexattr_init: + ssm_flow_set_destroy(set); + fail_set: + return NULL; +} + +struct ssm_flow_set * ssm_flow_set_open(pid_t pid) +{ + return flow_set_create(pid, O_RDWR); +} + +void ssm_flow_set_destroy(struct ssm_flow_set * set) +{ + char fn[FN_MAX_CHARS]; + + assert(set); + + sprintf(fn, SSM_FLOW_SET_PREFIX "%d", set->pid); + + ssm_flow_set_close(set); + + shm_unlink(fn); +} + +void ssm_flow_set_close(struct ssm_flow_set * set) +{ + assert(set); + + munmap(set->mtable, SSM_FSET_FILE_SIZE); + free(set); +} + +void ssm_flow_set_zero(struct ssm_flow_set * set, + size_t idx) +{ + ssize_t i = 0; + + assert(set); + assert(idx < PROG_MAX_FQUEUES); + + pthread_mutex_lock(set->lock); + + for (i = 0; i < SYS_MAX_FLOWS; ++i) + if (set->mtable[i] == (ssize_t) idx) + set->mtable[i] = -1; + + set->heads[idx] = 0; + + pthread_mutex_unlock(set->lock); +} + + +int ssm_flow_set_add(struct ssm_flow_set * set, + size_t idx, + int flow_id) +{ + assert(set); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); + assert(idx < PROG_MAX_FQUEUES); + + pthread_mutex_lock(set->lock); + + if (set->mtable[flow_id] != -1) { + pthread_mutex_unlock(set->lock); + return -EPERM; + } + + set->mtable[flow_id] = idx; + + pthread_mutex_unlock(set->lock); + + return 0; +} + +void ssm_flow_set_del(struct ssm_flow_set * set, + size_t idx, + int flow_id) +{ + assert(set); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); + assert(idx < PROG_MAX_FQUEUES); + + pthread_mutex_lock(set->lock); + + if (set->mtable[flow_id] == (ssize_t) idx) + set->mtable[flow_id] = -1; + + pthread_mutex_unlock(set->lock); +} + +int ssm_flow_set_has(struct ssm_flow_set * set, + size_t idx, + int flow_id) +{ + int ret = 0; + + assert(set); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); + assert(idx < PROG_MAX_FQUEUES); + + pthread_mutex_lock(set->lock); + + if (set->mtable[flow_id] == (ssize_t) idx) + ret = 1; + + pthread_mutex_unlock(set->lock); + + return ret; +} + +void ssm_flow_set_notify(struct ssm_flow_set * set, + int flow_id, + int event) +{ + struct flowevent * e; + + assert(set); + assert(!(flow_id < 0) && flow_id < SYS_MAX_FLOWS); + + pthread_mutex_lock(set->lock); + + if (set->mtable[flow_id] == -1) { + pthread_mutex_unlock(set->lock); + return; + } + + e = fqueue_ptr(set, set->mtable[flow_id]) + + set->heads[set->mtable[flow_id]]; + + e->flow_id = flow_id; + e->event = event; + + ++set->heads[set->mtable[flow_id]]; + + pthread_cond_signal(&set->conds[set->mtable[flow_id]]); + + pthread_mutex_unlock(set->lock); +} + + +ssize_t ssm_flow_set_wait(const struct ssm_flow_set * set, + size_t idx, + struct flowevent * fqueue, + const struct timespec * abstime) +{ + ssize_t ret = 0; + + assert(set); + assert(idx < PROG_MAX_FQUEUES); + assert(fqueue); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(set->lock); +#else + if (pthread_mutex_lock(set->lock) == EOWNERDEAD) + pthread_mutex_consistent(set->lock); +#endif + + pthread_cleanup_push(__cleanup_mutex_unlock, set->lock); + + while (set->heads[idx] == 0 && ret != -ETIMEDOUT) { + ret = -__timedwait(set->conds + idx, set->lock, abstime); +#ifdef HAVE_CANCEL_BUG + if (ret == -ETIMEDOUT) + pthread_testcancel(); +#endif +#ifdef HAVE_ROBUST_MUTEX + if (ret == -EOWNERDEAD) + pthread_mutex_consistent(set->lock); +#endif + } + + if (ret != -ETIMEDOUT) { + memcpy(fqueue, + fqueue_ptr(set, idx), + set->heads[idx] * sizeof(*fqueue)); + ret = set->heads[idx]; + set->heads[idx] = 0; + } + + pthread_cleanup_pop(true); + + assert(ret); + + return ret; +} diff --git a/src/lib/ssm/pool.c b/src/lib/ssm/pool.c new file mode 100644 index 00000000..b8cfe3a1 --- /dev/null +++ b/src/lib/ssm/pool.c @@ -0,0 +1,882 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Secure Shared Memory Infrastructure (SSMI) Packet Buffer + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200809L + +#include "config.h" + +#include <ouroboros/errno.h> +#include <ouroboros/pthread.h> +#include <ouroboros/ssm_pool.h> + +#include "ssm.h" + +#include <assert.h> +#include <fcntl.h> +#include <signal.h> +#include <stdbool.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/mman.h> +#include <sys/stat.h> + +/* Size class configuration from CMake */ +static const struct ssm_size_class_cfg ssm_sc_cfg[SSM_POOL_MAX_CLASSES] = { + { (1 << 8), SSM_POOL_256_BLOCKS }, + { (1 << 9), SSM_POOL_512_BLOCKS }, + { (1 << 10), SSM_POOL_1K_BLOCKS }, + { (1 << 11), SSM_POOL_2K_BLOCKS }, + { (1 << 12), SSM_POOL_4K_BLOCKS }, + { (1 << 14), SSM_POOL_16K_BLOCKS }, + { (1 << 16), SSM_POOL_64K_BLOCKS }, + { (1 << 18), SSM_POOL_256K_BLOCKS }, + { (1 << 20), SSM_POOL_1M_BLOCKS }, +}; + +#define PTR_TO_OFFSET(pool_base, ptr) \ + ((uintptr_t)(ptr) - (uintptr_t)(pool_base)) + +#define OFFSET_TO_PTR(pool_base, offset) \ + ((offset == 0) ? NULL : (void *)((uintptr_t)(pool_base) + offset)) + +#define GET_SHARD_FOR_PID(pid) ((int)((pid) % SSM_POOL_SHARDS)) + +#define LOAD_RELAXED(ptr) \ + (__atomic_load_n(ptr, __ATOMIC_RELAXED)) + +#define LOAD_ACQUIRE(ptr) \ + (__atomic_load_n(ptr, __ATOMIC_ACQUIRE)) + +#define STORE_RELEASE(ptr, val) \ + (__atomic_store_n(ptr, val, __ATOMIC_RELEASE)) + +#define LOAD(ptr) \ + (__atomic_load_n(ptr, __ATOMIC_SEQ_CST)) + +#define STORE(ptr, val) \ + (__atomic_store_n(ptr, val, __ATOMIC_SEQ_CST)) + +#define FETCH_ADD(ptr, val) \ + (__atomic_fetch_add(ptr, val, __ATOMIC_SEQ_CST)) + +#define FETCH_SUB(ptr, val) \ + (__atomic_fetch_sub(ptr, val, __ATOMIC_SEQ_CST)) + +#define CAS(ptr, expected, desired) \ + (__atomic_compare_exchange_n(ptr, expected, desired, false, \ + __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + +#define SSM_FILE_SIZE (SSM_POOL_TOTAL_SIZE + sizeof(struct _ssm_pool_hdr)) + +struct ssm_pool { + uint8_t * shm_base; /* start of blocks */ + struct _ssm_pool_hdr * hdr; /* shared memory header */ + void * pool_base; /* base of the memory pool */ +}; + +static __inline__ +struct ssm_pk_buff * list_remove_head(struct _ssm_list_head * head, + void * base) +{ + uint32_t off; + uint32_t next_off; + struct ssm_pk_buff * blk; + + assert(head != NULL); + assert(base != NULL); + + off = LOAD(&head->head_offset); + if (off == 0) + return NULL; + + /* Validate offset is within pool bounds */ + if (off >= SSM_POOL_TOTAL_SIZE) + return NULL; + + blk = OFFSET_TO_PTR(base, off); + next_off = LOAD(&blk->next_offset); + + + + STORE(&head->head_offset, next_off); + STORE(&head->count, LOAD(&head->count) - 1); + + return blk; +} +static __inline__ void list_add_head(struct _ssm_list_head * head, + struct ssm_pk_buff * blk, + void * base) +{ + uint32_t off; + uint32_t old; + + assert(head != NULL); + assert(blk != NULL); + assert(base != NULL); + + off = (uint32_t) PTR_TO_OFFSET(base, blk); + old = LOAD(&head->head_offset); + + STORE(&blk->next_offset, old); + STORE(&head->head_offset, off); + STORE(&head->count, LOAD(&head->count) + 1); +} + +static __inline__ int select_size_class(size_t len) +{ + size_t sz; + int i; + + /* Total space needed: header + headspace + data + tailspace */ + sz = sizeof(struct ssm_pk_buff) + SSM_PK_BUFF_HEADSPACE + len + + SSM_PK_BUFF_TAILSPACE; + + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + if (ssm_sc_cfg[i].blocks > 0 && sz <= ssm_sc_cfg[i].size) + return i; + } + + return -1; +} + +static __inline__ int find_size_class_for_offset(struct ssm_pool * pool, + size_t offset) +{ + int c; + + assert(pool != NULL); + + for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) { + struct _ssm_size_class * sc = &pool->hdr->size_classes[c]; + + if (sc->object_size == 0) + continue; + + if (offset >= sc->pool_start && + offset < sc->pool_start + sc->pool_size) + return c; + } + + return -1; +} + +static void init_size_classes(struct ssm_pool * pool) +{ + struct _ssm_size_class * sc; + struct _ssm_shard * shard; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + uint8_t * region; + size_t offset; + int c; + int s; + size_t i; + + assert(pool != NULL); + + /* Check if already initialized */ + if (LOAD(&pool->hdr->initialized) != 0) + return; + + pthread_mutexattr_init(&mattr); + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +#ifdef HAVE_ROBUST_MUTEX + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + pthread_mutexattr_setprotocol(&mattr, PTHREAD_PRIO_INHERIT); + + pthread_condattr_init(&cattr); + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + offset = 0; + + for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) { + if (ssm_sc_cfg[c].blocks == 0) + continue; + + sc = &pool->hdr->size_classes[c]; + + sc->object_size = ssm_sc_cfg[c].size; + sc->pool_start = offset; + sc->pool_size = ssm_sc_cfg[c].size * ssm_sc_cfg[c].blocks; + sc->object_count = ssm_sc_cfg[c].blocks; + + /* Initialize all shards */ + for (s = 0; s < SSM_POOL_SHARDS; s++) { + shard = &sc->shards[s]; + + STORE(&shard->free_list.head_offset, 0); + STORE(&shard->free_list.count, 0); + STORE(&shard->free_count, 0); + + pthread_mutex_init(&shard->mtx, &mattr); + pthread_cond_init(&shard->cond, &cattr); + } + + /* Lazy distribution: put all blocks in shard 0 initially */ + region = pool->shm_base + offset; + + for (i = 0; i < sc->object_count; ++i) { + struct ssm_pk_buff * blk; + + blk = (struct ssm_pk_buff *) + (region + i * sc->object_size); + + STORE(&blk->refcount, 0); + blk->allocator_pid = 0; + STORE(&blk->next_offset, 0); + + list_add_head(&sc->shards[0].free_list, blk, + pool->pool_base); + FETCH_ADD(&sc->shards[0].free_count, 1); + } + + offset += sc->pool_size; + } + + /* Mark as initialized - acts as memory barrier */ + STORE(&pool->hdr->initialized, 1); + + pthread_mutexattr_destroy(&mattr); + pthread_condattr_destroy(&cattr); +} + +/* + * Reclaim all blocks allocated by a specific pid in a size class. + * Called with shard mutex held. + */ +static size_t reclaim_pid_from_sc(struct _ssm_size_class * sc, + struct _ssm_shard * shard, + void * pool_base, + pid_t pid) +{ + uint8_t * region; + size_t i; + size_t recovered = 0; + struct ssm_pk_buff * blk; + + region = (uint8_t *) pool_base + sc->pool_start; + + for (i = 0; i < sc->object_count; ++i) { + blk = (struct ssm_pk_buff *)(region + i * sc->object_size); + + if (blk->allocator_pid == pid && LOAD(&blk->refcount) > 0) { + STORE(&blk->refcount, 0); + blk->allocator_pid = 0; + list_add_head(&shard->free_list, blk, pool_base); + FETCH_ADD(&shard->free_count, 1); + recovered++; + } + } + + return recovered; +} + +void ssm_pool_reclaim_orphans(struct ssm_pool * pool, + pid_t pid) +{ + size_t sc_idx; + + if (pool == NULL || pid <= 0) + return; + + for (sc_idx = 0; sc_idx < SSM_POOL_MAX_CLASSES; sc_idx++) { + struct _ssm_size_class * sc; + struct _ssm_shard * shard; + + sc = &pool->hdr->size_classes[sc_idx]; + if (sc->object_count == 0) + continue; + + /* Reclaim to shard 0 for simplicity */ + shard = &sc->shards[0]; + robust_mutex_lock(&shard->mtx); + reclaim_pid_from_sc(sc, shard, pool->pool_base, pid); + pthread_mutex_unlock(&shard->mtx); + } +} + +static __inline__ +struct ssm_pk_buff * try_alloc_from_shard(struct _ssm_shard * shard, + void * base) +{ + struct ssm_pk_buff * blk; + + robust_mutex_lock(&shard->mtx); + + if (LOAD(&shard->free_count) > 0) { + blk = list_remove_head(&shard->free_list, base); + if (blk != NULL) { + FETCH_SUB(&shard->free_count, 1); + return blk; /* Caller must unlock */ + } + FETCH_SUB(&shard->free_count, 1); + } + + pthread_mutex_unlock(&shard->mtx); + return NULL; +} + +static __inline__ ssize_t init_block(struct ssm_pool * pool, + struct _ssm_size_class * sc, + struct _ssm_shard * shard, + struct ssm_pk_buff * blk, + size_t len, + uint8_t ** ptr, + struct ssm_pk_buff ** spb) +{ + STORE(&blk->refcount, 1); + blk->allocator_pid = getpid(); + blk->size = (uint32_t) (sc->object_size - + sizeof(struct ssm_pk_buff)); + blk->pk_head = SSM_PK_BUFF_HEADSPACE; + blk->pk_tail = blk->pk_head + (uint32_t) len; + blk->off = (uint32_t) PTR_TO_OFFSET(pool->pool_base, blk); + + pthread_mutex_unlock(&shard->mtx); + + *spb = blk; + if (ptr != NULL) + *ptr = blk->data + blk->pk_head; + + return blk->off; +} + +/* Non-blocking allocation from size class */ +static ssize_t alloc_from_sc(struct ssm_pool * pool, + int idx, + size_t len, + uint8_t ** ptr, + struct ssm_pk_buff ** spb) +{ + struct _ssm_size_class * sc; + struct ssm_pk_buff * blk; + int local; + int s; + + assert(pool != NULL); + assert(idx >= 0 && idx < SSM_POOL_MAX_CLASSES); + assert(spb != NULL); + + sc = &pool->hdr->size_classes[idx]; + local = GET_SHARD_FOR_PID(getpid()); + + for (s = 0; s < SSM_POOL_SHARDS; s++) { + struct _ssm_shard * shard; + int idx; + + idx = (local + s) % SSM_POOL_SHARDS; + shard = &sc->shards[idx]; + + blk = try_alloc_from_shard(shard, pool->pool_base); + if (blk != NULL) + return init_block(pool, sc, shard, blk, len, ptr, spb); + } + + return -EAGAIN; +} + +/* Blocking allocation from size class */ +static ssize_t alloc_from_sc_b(struct ssm_pool * pool, + int idx, + size_t len, + uint8_t ** ptr, + struct ssm_pk_buff ** spb, + const struct timespec * abstime) +{ + struct _ssm_size_class * sc; + struct _ssm_shard * shard; + struct ssm_pk_buff * blk = NULL; + int local; + int s; + int ret = 0; + + assert(pool != NULL); + assert(idx >= 0 && idx < SSM_POOL_MAX_CLASSES); + assert(spb != NULL); + + sc = &pool->hdr->size_classes[idx]; + local = GET_SHARD_FOR_PID(getpid()); + + while (blk == NULL && ret != ETIMEDOUT) { + /* Try non-blocking allocation from any shard */ + for (s = 0; s < SSM_POOL_SHARDS && blk == NULL; s++) { + shard = &sc->shards[(local + s) % SSM_POOL_SHARDS]; + blk = try_alloc_from_shard(shard, pool->pool_base); + } + + if (blk != NULL) + break; + + /* Nothing available, wait for signal */ + shard = &sc->shards[local]; + robust_mutex_lock(&shard->mtx); + ret = robust_wait(&shard->cond, &shard->mtx, abstime); + pthread_mutex_unlock(&shard->mtx); + } + + if (ret == ETIMEDOUT) + return -ETIMEDOUT; + + return init_block(pool, sc, shard, blk, len, ptr, spb); +} + +/* Global Shared Packet Pool */ +static char * gspp_filename(void) +{ + char * str; + char * test_suffix; + + test_suffix = getenv("OUROBOROS_TEST_POOL_SUFFIX"); + if (test_suffix != NULL) { + str = malloc(strlen(SSM_POOL_NAME) + strlen(test_suffix) + 1); + if (str == NULL) + return NULL; + sprintf(str, "%s%s", SSM_POOL_NAME, test_suffix); + } else { + str = malloc(strlen(SSM_POOL_NAME) + 1); + if (str == NULL) + return NULL; + sprintf(str, "%s", SSM_POOL_NAME); + } + + return str; +} + +void ssm_pool_close(struct ssm_pool * pool) +{ + assert(pool != NULL); + + munmap(pool->shm_base, SSM_FILE_SIZE); + free(pool); +} + +void ssm_pool_destroy(struct ssm_pool * pool) +{ + char * fn; + + assert(pool != NULL); + + if (getpid() != pool->hdr->pid && kill(pool->hdr->pid, 0) == 0) { + free(pool); + return; + } + + ssm_pool_close(pool); + + fn = gspp_filename(); + if (fn == NULL) + return; + + shm_unlink(fn); + free(fn); +} + +#define MM_FLAGS (PROT_READ | PROT_WRITE) + +static struct ssm_pool * pool_create(int flags) +{ + struct ssm_pool * pool; + int fd; + uint8_t * shm_base; + char * fn; + + fn = gspp_filename(); + if (fn == NULL) + goto fail_fn; + + pool = malloc(sizeof *pool); + if (pool == NULL) + goto fail_rdrb; + + fd = shm_open(fn, flags, 0666); + if (fd == -1) + goto fail_open; + + if ((flags & O_CREAT) && ftruncate(fd, SSM_FILE_SIZE) < 0) + goto fail_truncate; + + shm_base = mmap(NULL, SSM_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0); + if (shm_base == MAP_FAILED) + goto fail_truncate; + + pool->shm_base = shm_base; + pool->pool_base = shm_base; + pool->hdr = (struct _ssm_pool_hdr *) (shm_base + SSM_POOL_TOTAL_SIZE); + + if (flags & O_CREAT) + pool->hdr->mapped_addr = shm_base; + + close(fd); + + free(fn); + + return pool; + + fail_truncate: + close(fd); + if (flags & O_CREAT) + shm_unlink(fn); + fail_open: + free(pool); + fail_rdrb: + free(fn); + fail_fn: + return NULL; +} + +struct ssm_pool * ssm_pool_create(void) +{ + struct ssm_pool * pool; + mode_t mask; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + + mask = umask(0); + + pool = pool_create(O_CREAT | O_EXCL | O_RDWR); + + umask(mask); + + if (pool == NULL) + goto fail_rdrb; + + if (pthread_mutexattr_init(&mattr)) + goto fail_mattr; + + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +#ifdef HAVE_ROBUST_MUTEX + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + if (pthread_mutex_init(&pool->hdr->mtx, &mattr)) + goto fail_mutex; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(&pool->hdr->healthy, &cattr)) + goto fail_healthy; + + pool->hdr->pid = getpid(); + /* Will be set by init_size_classes */ + STORE(&pool->hdr->initialized, 0); + + init_size_classes(pool); + + pthread_mutexattr_destroy(&mattr); + pthread_condattr_destroy(&cattr); + + return pool; + + fail_healthy: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(&pool->hdr->mtx); + fail_mutex: + pthread_mutexattr_destroy(&mattr); + fail_mattr: + ssm_pool_destroy(pool); + fail_rdrb: + return NULL; +} + +struct ssm_pool * ssm_pool_open(void) +{ + struct ssm_pool * pool; + + pool = pool_create(O_RDWR); + if (pool != NULL) + init_size_classes(pool); + + return pool; +} + +void ssm_pool_purge(void) +{ + char * fn; + + fn = gspp_filename(); + if (fn == NULL) + return; + + shm_unlink(fn); + free(fn); +} + +int ssm_pool_mlock(struct ssm_pool * pool) +{ + assert(pool != NULL); + + return mlock(pool->shm_base, SSM_FILE_SIZE); +} + +ssize_t ssm_pool_alloc(struct ssm_pool * pool, + size_t count, + uint8_t ** ptr, + struct ssm_pk_buff ** spb) +{ + int idx; + + assert(pool != NULL); + assert(spb != NULL); + + idx = select_size_class(count); + if (idx >= 0) + return alloc_from_sc(pool, idx, count, ptr, spb); + + return -EMSGSIZE; +} + +ssize_t ssm_pool_alloc_b(struct ssm_pool * pool, + size_t count, + uint8_t ** ptr, + struct ssm_pk_buff ** spb, + const struct timespec * abstime) +{ + int idx; + + assert(pool != NULL); + assert(spb != NULL); + + idx = select_size_class(count); + if (idx >= 0) + return alloc_from_sc_b(pool, idx, count, ptr, spb, abstime); + + return -EMSGSIZE; +} + +ssize_t ssm_pool_read(uint8_t ** dst, + struct ssm_pool * pool, + size_t off) +{ + struct ssm_pk_buff * blk; + + assert(dst != NULL); + assert(pool != NULL); + + blk = OFFSET_TO_PTR(pool->pool_base, off); + if (blk == NULL) + return -EINVAL; + + *dst = blk->data + blk->pk_head; + + return (ssize_t) (blk->pk_tail - blk->pk_head); +} + +struct ssm_pk_buff * ssm_pool_get(struct ssm_pool * pool, + size_t off) +{ + struct ssm_pk_buff * blk; + + assert(pool != NULL); + + if (off == 0 || off >= (size_t) SSM_POOL_TOTAL_SIZE) + return NULL; + + blk = OFFSET_TO_PTR(pool->pool_base, off); + if (blk == NULL) + return NULL; + + if (LOAD(&blk->refcount) == 0) + return NULL; + + return blk; +} + +int ssm_pool_remove(struct ssm_pool * pool, + size_t off) +{ + struct ssm_pk_buff * blk; + struct _ssm_size_class * sc; + struct _ssm_shard * shard; + int sc_idx; + int shard_idx; + uint16_t old_ref; + + assert(pool != NULL); + + if (off == 0 || off >= SSM_POOL_TOTAL_SIZE) + return -EINVAL; + + blk = OFFSET_TO_PTR(pool->pool_base, off); + if (blk == NULL) + return -EINVAL; + + sc_idx = find_size_class_for_offset(pool, off); + if (sc_idx < 0) + return -EINVAL; + + sc = &pool->hdr->size_classes[sc_idx]; + + /* Free to allocator's shard (lazy distribution in action) */ + shard_idx = GET_SHARD_FOR_PID(blk->allocator_pid); + shard = &sc->shards[shard_idx]; + + robust_mutex_lock(&shard->mtx); + + old_ref = FETCH_SUB(&blk->refcount, 1); + if (old_ref > 1) { + /* Still referenced */ + pthread_mutex_unlock(&shard->mtx); + return 0; + } + + blk->allocator_pid = 0; +#ifdef CONFIG_OUROBOROS_DEBUG + if (old_ref == 0) { + /* Underflow - double free attempt */ + pthread_mutex_unlock(&shard->mtx); + abort(); + } + + /* Poison fields to detect use-after-free */ + blk->pk_head = 0xDEAD; + blk->pk_tail = 0xBEEF; +#endif + list_add_head(&shard->free_list, blk, pool->pool_base); + FETCH_ADD(&shard->free_count, 1); + + pthread_cond_signal(&shard->cond); + + pthread_mutex_unlock(&shard->mtx); + + return 0; +} + +size_t ssm_pk_buff_get_idx(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + return spb->off; +} + +uint8_t * ssm_pk_buff_head(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + return spb->data + spb->pk_head; +} + +uint8_t * ssm_pk_buff_tail(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + return spb->data + spb->pk_tail; +} + +size_t ssm_pk_buff_len(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + return spb->pk_tail - spb->pk_head; +} + +uint8_t * ssm_pk_buff_head_alloc(struct ssm_pk_buff * spb, + size_t size) +{ + assert(spb != NULL); + + if (spb->pk_head < size) + return NULL; + + spb->pk_head -= size; + + return spb->data + spb->pk_head; +} + +uint8_t * ssm_pk_buff_tail_alloc(struct ssm_pk_buff * spb, + size_t size) +{ + uint8_t * buf; + + assert(spb != NULL); + + if (spb->pk_tail + size >= spb->size) + return NULL; + + buf = spb->data + spb->pk_tail; + + spb->pk_tail += size; + + return buf; +} + +uint8_t * ssm_pk_buff_head_release(struct ssm_pk_buff * spb, + size_t size) +{ + uint8_t * buf; + + assert(spb != NULL); + assert(!(size > spb->pk_tail - spb->pk_head)); + + buf = spb->data + spb->pk_head; + + spb->pk_head += size; + + return buf; +} + +uint8_t * ssm_pk_buff_tail_release(struct ssm_pk_buff * spb, + size_t size) +{ + assert(spb != NULL); + assert(!(size > spb->pk_tail - spb->pk_head)); + + spb->pk_tail -= size; + + return spb->data + spb->pk_tail; +} + +void ssm_pk_buff_truncate(struct ssm_pk_buff * spb, + size_t len) +{ + assert(spb != NULL); + assert(len <= spb->size); + + spb->pk_tail = spb->pk_head + len; +} + +int ssm_pk_buff_wait_ack(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + FETCH_ADD(&spb->refcount, 1); + + return 0; +} + +int ssm_pk_buff_ack(struct ssm_pk_buff * spb) +{ + assert(spb != NULL); + + FETCH_SUB(&spb->refcount, 1); + + return 0; +} diff --git a/src/lib/ssm/rbuff.c b/src/lib/ssm/rbuff.c new file mode 100644 index 00000000..e18f8ba7 --- /dev/null +++ b/src/lib/ssm/rbuff.c @@ -0,0 +1,449 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Ring buffer implementations for incoming packets + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200809L + +#include "config.h" +#include "ssm.h" + +#include <ouroboros/ssm_rbuff.h> +#include <ouroboros/lockfile.h> +#include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> +#include <ouroboros/pthread.h> +#include <ouroboros/time.h> + +#include <assert.h> +#include <fcntl.h> +#include <signal.h> +#include <stdbool.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/mman.h> +#include <sys/stat.h> + +#define FN_MAX_CHARS 255 + +#define SSM_RBUFF_FILESIZE ((SSM_RBUFF_SIZE) * sizeof(ssize_t) \ + + 3 * sizeof(size_t) \ + + sizeof(pthread_mutex_t) \ + + 2 * sizeof(pthread_cond_t)) + +#define MODB(x) ((x) & (SSM_RBUFF_SIZE - 1)) + +#define LOAD_RELAXED(ptr) (__atomic_load_n(ptr, __ATOMIC_RELAXED)) +#define LOAD_ACQUIRE(ptr) (__atomic_load_n(ptr, __ATOMIC_ACQUIRE)) +#define STORE_RELEASE(ptr, val) \ + (__atomic_store_n(ptr, val, __ATOMIC_RELEASE)) + +#define HEAD(rb) (rb->shm_base[LOAD_RELAXED(rb->head)]) +#define TAIL(rb) (rb->shm_base[LOAD_RELAXED(rb->tail)]) +#define HEAD_IDX(rb) (LOAD_ACQUIRE(rb->head)) +#define TAIL_IDX(rb) (LOAD_ACQUIRE(rb->tail)) +#define ADVANCE_HEAD(rb) \ + (STORE_RELEASE(rb->head, MODB(LOAD_RELAXED(rb->head) + 1))) +#define ADVANCE_TAIL(rb) \ + (STORE_RELEASE(rb->tail, MODB(LOAD_RELAXED(rb->tail) + 1))) +#define QUEUED(rb) (MODB(HEAD_IDX(rb) - TAIL_IDX(rb))) +#define IS_FULL(rb) (QUEUED(rb) == (SSM_RBUFF_SIZE - 1)) +#define IS_EMPTY(rb) (HEAD_IDX(rb) == TAIL_IDX(rb)) + +struct ssm_rbuff { + ssize_t * shm_base; /* start of shared memory */ + size_t * head; /* start of ringbuffer */ + size_t * tail; + size_t * acl; /* access control */ + pthread_mutex_t * mtx; /* lock for cond vars only */ + pthread_cond_t * add; /* signal when new data */ + pthread_cond_t * del; /* signal when data removed */ + pid_t pid; /* pid of the owner */ + int flow_id; /* flow_id of the flow */ +}; + +#define MM_FLAGS (PROT_READ | PROT_WRITE) + +static struct ssm_rbuff * rbuff_create(pid_t pid, + int flow_id, + int flags) +{ + struct ssm_rbuff * rb; + int fd; + ssize_t * shm_base; + char fn[FN_MAX_CHARS]; + + sprintf(fn, SSM_RBUFF_PREFIX "%d.%d", pid, flow_id); + + rb = malloc(sizeof(*rb)); + if (rb == NULL) + goto fail_malloc; + + fd = shm_open(fn, flags, 0666); + if (fd == -1) + goto fail_open; + + if ((flags & O_CREAT) && ftruncate(fd, SSM_RBUFF_FILESIZE) < 0) + goto fail_truncate; + + shm_base = mmap(NULL, SSM_RBUFF_FILESIZE, MM_FLAGS, MAP_SHARED, fd, 0); + + close(fd); + + rb->shm_base = shm_base; + rb->head = (size_t *) (rb->shm_base + (SSM_RBUFF_SIZE)); + rb->tail = (size_t *) (rb->head + 1); + rb->acl = (size_t *) (rb->tail + 1); + rb->mtx = (pthread_mutex_t *) (rb->acl + 1); + rb->add = (pthread_cond_t *) (rb->mtx + 1); + rb->del = rb->add + 1; + rb->pid = pid; + rb->flow_id = flow_id; + + return rb; + + fail_truncate: + close(fd); + if (flags & O_CREAT) + shm_unlink(fn); + fail_open: + free(rb); + fail_malloc: + return NULL; +} + +static void rbuff_destroy(struct ssm_rbuff * rb) +{ + munmap(rb->shm_base, SSM_RBUFF_FILESIZE); + + free(rb); +} + +struct ssm_rbuff * ssm_rbuff_create(pid_t pid, + int flow_id) +{ + struct ssm_rbuff * rb; + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + mode_t mask; + + mask = umask(0); + + rb = rbuff_create(pid, flow_id, O_CREAT | O_EXCL | O_RDWR); + + umask(mask); + + if (rb == NULL) + goto fail_rb; + + if (pthread_mutexattr_init(&mattr)) + goto fail_mattr; + + pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED); +#ifdef HAVE_ROBUST_MUTEX + pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST); +#endif + if (pthread_mutex_init(rb->mtx, &mattr)) + goto fail_mutex; + + if (pthread_condattr_init(&cattr)) + goto fail_cattr; + + pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED); +#ifndef __APPLE__ + pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); +#endif + if (pthread_cond_init(rb->add, &cattr)) + goto fail_add; + + if (pthread_cond_init(rb->del, &cattr)) + goto fail_del; + + *rb->acl = ACL_RDWR; + *rb->head = 0; + *rb->tail = 0; + + rb->pid = pid; + rb->flow_id = flow_id; + + pthread_mutexattr_destroy(&mattr); + pthread_condattr_destroy(&cattr); + + return rb; + + fail_del: + pthread_cond_destroy(rb->add); + fail_add: + pthread_condattr_destroy(&cattr); + fail_cattr: + pthread_mutex_destroy(rb->mtx); + fail_mutex: + pthread_mutexattr_destroy(&mattr); + fail_mattr: + ssm_rbuff_destroy(rb); + fail_rb: + return NULL; +} + +void ssm_rbuff_destroy(struct ssm_rbuff * rb) +{ + char fn[FN_MAX_CHARS]; + + assert(rb != NULL); + + sprintf(fn, SSM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); + + ssm_rbuff_close(rb); + + shm_unlink(fn); +} + +struct ssm_rbuff * ssm_rbuff_open(pid_t pid, + int flow_id) +{ + return rbuff_create(pid, flow_id, O_RDWR); +} + +void ssm_rbuff_close(struct ssm_rbuff * rb) +{ + assert(rb); + + rbuff_destroy(rb); +} + +int ssm_rbuff_write(struct ssm_rbuff * rb, + size_t idx) +{ + size_t acl; + bool was_empty; + int ret = 0; + + assert(rb != NULL); + + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + if (acl != ACL_RDWR) { + if (acl & ACL_FLOWDOWN) { + ret = -EFLOWDOWN; + goto fail_acl; + } + if (acl & ACL_RDONLY) { + ret = -ENOTALLOC; + goto fail_acl; + } + } + + robust_mutex_lock(rb->mtx); + + if (IS_FULL(rb)) { + ret = -EAGAIN; + goto fail_mutex; + } + + was_empty = IS_EMPTY(rb); + + HEAD(rb) = (ssize_t) idx; + ADVANCE_HEAD(rb); + + if (was_empty) + pthread_cond_broadcast(rb->add); + + pthread_mutex_unlock(rb->mtx); + + return 0; + + fail_mutex: + pthread_mutex_unlock(rb->mtx); + fail_acl: + return ret; +} + +int ssm_rbuff_write_b(struct ssm_rbuff * rb, + size_t idx, + const struct timespec * abstime) +{ + size_t acl; + int ret = 0; + bool was_empty; + + assert(rb != NULL); + + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + if (acl != ACL_RDWR) { + if (acl & ACL_FLOWDOWN) { + ret = -EFLOWDOWN; + goto fail_acl; + } + if (acl & ACL_RDONLY) { + ret = -ENOTALLOC; + goto fail_acl; + } + } + + robust_mutex_lock(rb->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); + + while (IS_FULL(rb) && ret != -ETIMEDOUT) { + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + if (acl & ACL_FLOWDOWN) { + ret = -EFLOWDOWN; + break; + } + ret = -robust_wait(rb->del, rb->mtx, abstime); + } + + pthread_cleanup_pop(false); + + if (ret != -ETIMEDOUT && ret != -EFLOWDOWN) { + was_empty = IS_EMPTY(rb); + HEAD(rb) = (ssize_t) idx; + ADVANCE_HEAD(rb); + if (was_empty) + pthread_cond_broadcast(rb->add); + } + + pthread_mutex_unlock(rb->mtx); + + fail_acl: + return ret; +} + +static int check_rb_acl(struct ssm_rbuff * rb) +{ + size_t acl; + + assert(rb != NULL); + + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + + if (acl & ACL_FLOWDOWN) + return -EFLOWDOWN; + + if (acl & ACL_FLOWPEER) + return -EFLOWPEER; + + return -EAGAIN; +} + +ssize_t ssm_rbuff_read(struct ssm_rbuff * rb) +{ + ssize_t ret; + + assert(rb != NULL); + + if (IS_EMPTY(rb)) + return check_rb_acl(rb); + + robust_mutex_lock(rb->mtx); + + ret = TAIL(rb); + ADVANCE_TAIL(rb); + + pthread_cond_broadcast(rb->del); + + pthread_mutex_unlock(rb->mtx); + + return ret; +} + +ssize_t ssm_rbuff_read_b(struct ssm_rbuff * rb, + const struct timespec * abstime) +{ + ssize_t idx = -1; + size_t acl; + + assert(rb != NULL); + + acl = __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); + if (IS_EMPTY(rb) && (acl & ACL_FLOWDOWN)) + return -EFLOWDOWN; + + robust_mutex_lock(rb->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); + + while (IS_EMPTY(rb) && + idx != -ETIMEDOUT && + check_rb_acl(rb) == -EAGAIN) { + idx = -robust_wait(rb->add, rb->mtx, abstime); + } + + pthread_cleanup_pop(false); + + if (!IS_EMPTY(rb)) { + idx = TAIL(rb); + ADVANCE_TAIL(rb); + pthread_cond_broadcast(rb->del); + } else if (idx != -ETIMEDOUT) { + idx = check_rb_acl(rb); + } + + pthread_mutex_unlock(rb->mtx); + + assert(idx != -EAGAIN); + + return idx; +} + +void ssm_rbuff_set_acl(struct ssm_rbuff * rb, + uint32_t flags) +{ + assert(rb != NULL); + + __atomic_store_n(rb->acl, (size_t) flags, __ATOMIC_SEQ_CST); +} + +uint32_t ssm_rbuff_get_acl(struct ssm_rbuff * rb) +{ + assert(rb != NULL); + + return (uint32_t) __atomic_load_n(rb->acl, __ATOMIC_SEQ_CST); +} + +void ssm_rbuff_fini(struct ssm_rbuff * rb) +{ + assert(rb != NULL); + + robust_mutex_lock(rb->mtx); + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->mtx); + + while (!IS_EMPTY(rb)) + robust_wait(rb->del, rb->mtx, NULL); + + pthread_cleanup_pop(true); +} + +size_t ssm_rbuff_queued(struct ssm_rbuff * rb) +{ + assert(rb != NULL); + + return QUEUED(rb); +} + +int ssm_rbuff_mlock(struct ssm_rbuff * rb) +{ + assert(rb != NULL); + + return mlock(rb->shm_base, SSM_RBUFF_FILESIZE); +} diff --git a/src/lib/ssm/ssm.h.in b/src/lib/ssm/ssm.h.in new file mode 100644 index 00000000..d14cd49c --- /dev/null +++ b/src/lib/ssm/ssm.h.in @@ -0,0 +1,146 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2026 + * + * Secure Shared Memory configuration + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#ifndef OUROBOROS_LIB_SSM_H +#define OUROBOROS_LIB_SSM_H + +#include <stddef.h> +#include <stdint.h> +#include <stdatomic.h> +#include <sys/types.h> + +/* Pool naming configuration */ +#define SSM_PREFIX "@SSM_PREFIX@" +#define SSM_GSMP_SUFFIX "@SSM_GSMP_SUFFIX@" +#define SSM_PPP_SUFFIX "@SSM_PPP_SUFFIX@" + +/* Legacy SSM constants */ +#define SSM_RBUFF_PREFIX "@SSM_RBUFF_PREFIX@" +#define SSM_FLOW_SET_PREFIX "@SSM_FLOW_SET_PREFIX@" +#define SSM_POOL_NAME "@SSM_POOL_NAME@" +#define SSM_POOL_BLOCKS @SSM_POOL_BLOCKS@ +#define SSM_RBUFF_SIZE @SSM_RBUFF_SIZE@ + +/* Packet buffer space reservation */ +#define SSM_PK_BUFF_HEADSPACE @SSM_PK_BUFF_HEADSPACE@ +#define SSM_PK_BUFF_TAILSPACE @SSM_PK_BUFF_TAILSPACE@ + +/* Pool blocks per size class */ +#define SSM_POOL_256_BLOCKS @SSM_POOL_256_BLOCKS@ +#define SSM_POOL_512_BLOCKS @SSM_POOL_512_BLOCKS@ +#define SSM_POOL_1K_BLOCKS @SSM_POOL_1K_BLOCKS@ +#define SSM_POOL_2K_BLOCKS @SSM_POOL_2K_BLOCKS@ +#define SSM_POOL_4K_BLOCKS @SSM_POOL_4K_BLOCKS@ +#define SSM_POOL_16K_BLOCKS @SSM_POOL_16K_BLOCKS@ +#define SSM_POOL_64K_BLOCKS @SSM_POOL_64K_BLOCKS@ +#define SSM_POOL_256K_BLOCKS @SSM_POOL_256K_BLOCKS@ +#define SSM_POOL_1M_BLOCKS @SSM_POOL_1M_BLOCKS@ +#define SSM_POOL_TOTAL_SIZE @SSM_POOL_TOTAL_SIZE@ + +/* Size class configuration */ +#define SSM_POOL_MAX_CLASSES 9 +#define SSM_POOL_SHARDS @SSM_POOL_SHARDS@ + +/* Internal structures - exposed for testing */ +#ifdef __cplusplus +extern "C" { +#endif + +#include <errno.h> +#include <pthread.h> + +#include <ouroboros/pthread.h> + +static __inline__ void robust_mutex_lock(pthread_mutex_t * mtx) +{ +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(mtx); +#else + if (pthread_mutex_lock(mtx) == EOWNERDEAD) + pthread_mutex_consistent(mtx); +#endif +} + +static __inline__ int robust_wait(pthread_cond_t * cond, + pthread_mutex_t * mtx, + const struct timespec * abstime) +{ + int ret = __timedwait(cond, mtx, abstime); +#ifdef HAVE_ROBUST_MUTEX + if (ret == EOWNERDEAD) + pthread_mutex_consistent(mtx); +#endif + return ret; +} + +/* Packet buffer structure used by pool, rbuff, and tests */ +struct ssm_pk_buff { + uint32_t next_offset; /* List linkage (pool < 4GB) */ + uint16_t refcount; /* Reference count (app + rtx) */ + pid_t allocator_pid; /* For orphan detection */ + uint32_t size; /* Block size (max 1MB) */ + uint32_t pk_head; /* Head offset into data */ + uint32_t pk_tail; /* Tail offset into data */ + uint32_t off; /* Block offset in pool */ + uint8_t data[]; /* Packet data */ +}; + +/* Size class configuration table */ +struct ssm_size_class_cfg { + size_t size; + size_t blocks; +}; + +struct _ssm_list_head { + uint32_t head_offset; + uint32_t count; +}; + +struct _ssm_shard { + pthread_mutex_t mtx; + pthread_cond_t cond; + struct _ssm_list_head free_list; + size_t free_count; +}; + +struct _ssm_size_class { + struct _ssm_shard shards[SSM_POOL_SHARDS]; + size_t object_size; + size_t pool_start; + size_t pool_size; + size_t object_count; +}; + +struct _ssm_pool_hdr { + pthread_mutex_t mtx; + pthread_cond_t healthy; + pid_t pid; + uint32_t initialized; + void * mapped_addr; + struct _ssm_size_class size_classes[SSM_POOL_MAX_CLASSES]; +}; + +#ifdef __cplusplus +} +#endif + +#endif /* OUROBOROS_LIB_SSM_H */ diff --git a/src/lib/ssm/tests/CMakeLists.txt b/src/lib/ssm/tests/CMakeLists.txt new file mode 100644 index 00000000..827f8bf8 --- /dev/null +++ b/src/lib/ssm/tests/CMakeLists.txt @@ -0,0 +1,33 @@ +get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) +get_filename_component(PARENT_DIR ${PARENT_PATH} NAME) + +compute_test_prefix() + +create_test_sourcelist(${PARENT_DIR}_tests test_suite.c + # Add new tests here + pool_test.c + pool_sharding_test.c + rbuff_test.c + flow_set_test.c + ) + +add_executable(${PARENT_DIR}_test ${${PARENT_DIR}_tests}) + +disable_test_logging_for_target(${PARENT_DIR}_test) +target_link_libraries(${PARENT_DIR}_test ouroboros-common) + +add_dependencies(build_tests ${PARENT_DIR}_test) + +set(tests_to_run ${${PARENT_DIR}_tests}) +if(CMAKE_VERSION VERSION_LESS "3.29.0") + remove(tests_to_run test_suite.c) +else () + list(POP_FRONT tests_to_run) +endif() + +foreach (test ${tests_to_run}) + get_filename_component(test_name ${test} NAME_WE) + add_test(${TEST_PREFIX}/${test_name} ${C_TEST_PATH}/${PARENT_DIR}_test ${test_name}) + set_property(TEST ${TEST_PREFIX}/${test_name} + PROPERTY ENVIRONMENT "OUROBOROS_TEST_POOL_SUFFIX=.test") +endforeach (test) diff --git a/src/lib/ssm/tests/flow_set_test.c b/src/lib/ssm/tests/flow_set_test.c new file mode 100644 index 00000000..f9084d3c --- /dev/null +++ b/src/lib/ssm/tests/flow_set_test.c @@ -0,0 +1,255 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Test of the SSM flow set + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" +#include "ssm.h" + +#include <test/test.h> +#include <ouroboros/ssm_flow_set.h> +#include <ouroboros/errno.h> +#include <ouroboros/time.h> + +#include <stdio.h> +#include <unistd.h> +#include <pthread.h> + +static int test_ssm_flow_set_create_destroy(void) +{ + struct ssm_flow_set * set; + pid_t pid; + + TEST_START(); + + pid = getpid(); + + set = ssm_flow_set_create(pid); + if (set == NULL) { + printf("Failed to create flow set.\n"); + goto fail; + } + + ssm_flow_set_destroy(set); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; +fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_flow_set_add_del_has(void) +{ + struct ssm_flow_set * set; + pid_t pid; + size_t idx = 0; + int flow_id = 42; + + TEST_START(); + + pid = getpid(); + + set = ssm_flow_set_create(pid); + if (set == NULL) { + printf("Failed to create flow set.\n"); + goto fail; + } + + if (ssm_flow_set_has(set, idx, flow_id)) { + printf("Flow should not be in set initially.\n"); + goto fail_destroy; + } + + if (ssm_flow_set_add(set, idx, flow_id) < 0) { + printf("Failed to add flow to set.\n"); + goto fail_destroy; + } + + if (!ssm_flow_set_has(set, idx, flow_id)) { + printf("Flow should be in set after add.\n"); + goto fail_destroy; + } + + /* Adding same flow again should fail */ + if (ssm_flow_set_add(set, idx, flow_id) != -EPERM) { + printf("Should not be able to add flow twice.\n"); + goto fail_destroy; + } + + ssm_flow_set_del(set, idx, flow_id); + + if (ssm_flow_set_has(set, idx, flow_id)) { + printf("Flow should not be in set after delete.\n"); + goto fail_destroy; + } + + ssm_flow_set_destroy(set); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; +fail_destroy: + ssm_flow_set_destroy(set); +fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_flow_set_zero(void) +{ + struct ssm_flow_set * set; + pid_t pid; + size_t idx = 0; + int flow_id1 = 10; + int flow_id2 = 20; + + TEST_START(); + + pid = getpid(); + + set = ssm_flow_set_create(pid); + if (set == NULL) { + printf("Failed to create flow set.\n"); + goto fail; + } + + if (ssm_flow_set_add(set, idx, flow_id1) < 0) { + printf("Failed to add flow1 to set.\n"); + goto fail_destroy; + } + + if (ssm_flow_set_add(set, idx, flow_id2) < 0) { + printf("Failed to add flow2 to set.\n"); + goto fail_destroy; + } + + ssm_flow_set_zero(set, idx); + + if (ssm_flow_set_has(set, idx, flow_id1)) { + printf("Flow1 should not be in set after zero.\n"); + goto fail_destroy; + } + + if (ssm_flow_set_has(set, idx, flow_id2)) { + printf("Flow2 should not be in set after zero.\n"); + goto fail_destroy; + } + + ssm_flow_set_destroy(set); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; +fail_destroy: + ssm_flow_set_destroy(set); +fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_flow_set_notify_wait(void) +{ + struct ssm_flow_set * set; + pid_t pid; + size_t idx = 0; + int flow_id = 100; + struct flowevent events[SSM_RBUFF_SIZE]; + struct timespec timeout; + ssize_t ret; + + TEST_START(); + + pid = getpid(); + + set = ssm_flow_set_create(pid); + if (set == NULL) { + printf("Failed to create flow set.\n"); + goto fail; + } + + if (ssm_flow_set_add(set, idx, flow_id) < 0) { + printf("Failed to add flow to set.\n"); + goto fail_destroy; + } + + /* Test immediate timeout when no events */ + clock_gettime(PTHREAD_COND_CLOCK, &timeout); + ret = ssm_flow_set_wait(set, idx, events, &timeout); + if (ret != -ETIMEDOUT) { + printf("Wait should timeout immediately when no events.\n"); + goto fail_destroy; + } + + /* Notify an event */ + ssm_flow_set_notify(set, flow_id, FLOW_PKT); + + /* Should be able to read the event immediately */ + clock_gettime(PTHREAD_COND_CLOCK, &timeout); + ts_add(&timeout, &timeout, &((struct timespec) {1, 0})); + + ret = ssm_flow_set_wait(set, idx, events, &timeout); + if (ret != 1) { + printf("Wait should return 1 event, got %zd.\n", ret); + goto fail_destroy; + } + + if (events[0].flow_id != flow_id) { + printf("Event flow_id mismatch: expected %d, got %d.\n", + flow_id, events[0].flow_id); + goto fail_destroy; + } + + if (events[0].event != FLOW_PKT) { + printf("Event type mismatch: expected %d, got %d.\n", + FLOW_PKT, events[0].event); + goto fail_destroy; + } + + ssm_flow_set_destroy(set); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; +fail_destroy: + ssm_flow_set_destroy(set); +fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +int flow_set_test(int argc, + char ** argv) +{ + int ret = 0; + + (void) argc; + (void) argv; + + ret |= test_ssm_flow_set_create_destroy(); + ret |= test_ssm_flow_set_add_del_has(); + ret |= test_ssm_flow_set_zero(); + ret |= test_ssm_flow_set_notify_wait(); + + return ret; +} diff --git a/src/lib/ssm/tests/pool_sharding_test.c b/src/lib/ssm/tests/pool_sharding_test.c new file mode 100644 index 00000000..72ae1cb7 --- /dev/null +++ b/src/lib/ssm/tests/pool_sharding_test.c @@ -0,0 +1,505 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2026 + * + * Test of the SSM pool sharding with fallback + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" +#include "ssm.h" + +#include <test/test.h> +#include <ouroboros/ssm_pool.h> + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stdbool.h> +#include <sys/wait.h> +#include <sys/types.h> +#include <signal.h> + +#define TEST_SIZE 256 + +/* Helper to get pool header for inspection */ +static struct _ssm_pool_hdr * get_pool_hdr(struct ssm_pool * pool) +{ + /* ssm_pool is opaque, but we know its layout: + * uint8_t * shm_base + * struct _ssm_pool_hdr * hdr + * void * pool_base + */ + struct _ssm_pool_hdr ** hdr_ptr = + (struct _ssm_pool_hdr **)((uint8_t *)pool + sizeof(void *)); + return *hdr_ptr; +} + +static int test_lazy_distribution(void) +{ + struct ssm_pool * pool; + struct _ssm_pool_hdr * hdr; + struct _ssm_size_class * sc; + int i; + int sc_idx; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + hdr = get_pool_hdr(pool); + if (hdr == NULL) { + printf("Failed to get pool header.\n"); + goto fail_pool; + } + + /* Find the first size class with blocks */ + sc_idx = -1; + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + if (hdr->size_classes[i].object_count > 0) { + sc_idx = i; + break; + } + } + + if (sc_idx < 0) { + printf("No size classes configured.\n"); + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + printf(" Class %d: count=%zu\n", i, + hdr->size_classes[i].object_count); + } + goto fail_pool; + } + + sc = &hdr->size_classes[sc_idx]; + + /* Verify all blocks start in shard 0 */ + if (sc->shards[0].free_count == 0) { + printf("Shard 0 should have all blocks initially.\n"); + goto fail_pool; + } + + /* Verify other shards are empty */ + for (i = 1; i < SSM_POOL_SHARDS; i++) { + if (sc->shards[i].free_count != 0) { + printf("Shard %d should be empty, has %zu.\n", + i, sc->shards[i].free_count); + goto fail_pool; + } + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_shard_migration(void) +{ + struct ssm_pool * pool; + struct _ssm_pool_hdr * hdr; + struct _ssm_size_class * sc; + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t off; + int shard_idx; + int sc_idx; + int i; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + hdr = get_pool_hdr(pool); + + /* Find the first size class with blocks */ + sc_idx = -1; + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + if (hdr->size_classes[i].object_count > 0) { + sc_idx = i; + break; + } + } + + if (sc_idx < 0) { + printf("No size classes configured.\n"); + goto fail; + } + + sc = &hdr->size_classes[sc_idx]; + + /* Allocate from this process */ + off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb); + if (off < 0) { + printf("Allocation failed: %zd.\n", off); + goto fail_pool; + } + + /* Free it - should go to this process's shard */ + shard_idx = getpid() % SSM_POOL_SHARDS; + if (ssm_pool_remove(pool, off) != 0) { + printf("Remove failed.\n"); + goto fail_pool; + } + + /* Verify block migrated away from shard 0 or in allocator's shard */ + if (sc->shards[shard_idx].free_count == 0 && + sc->shards[0].free_count == 0) { + printf("Block should have been freed to a shard.\n"); + goto fail_pool; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_fallback_stealing(void) +{ + struct ssm_pool * pool; + struct _ssm_pool_hdr * hdr; + struct _ssm_size_class * sc; + struct ssm_pk_buff ** spbs; + uint8_t ** ptrs; + size_t total_blocks; + size_t total_free; + size_t i; + int sc_idx; + int c; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + hdr = get_pool_hdr(pool); + + /* Find the first size class with blocks */ + sc_idx = -1; + for (c = 0; c < SSM_POOL_MAX_CLASSES; c++) { + if (hdr->size_classes[c].object_count > 0) { + sc_idx = c; + break; + } + } + + if (sc_idx < 0) { + printf("No size classes configured.\n"); + goto fail; + } + + sc = &hdr->size_classes[sc_idx]; + total_blocks = sc->object_count; + + spbs = malloc(total_blocks * sizeof(struct ssm_pk_buff *)); + ptrs = malloc(total_blocks * sizeof(uint8_t *)); + if (spbs == NULL || ptrs == NULL) { + printf("Failed to allocate test arrays.\n"); + goto fail_pool; + } + + /* Allocate half the blocks from single process */ + for (i = 0; i < total_blocks / 2; i++) { + ssize_t off = ssm_pool_alloc(pool, TEST_SIZE, + &ptrs[i], &spbs[i]); + if (off < 0) { + printf("Allocation %zu failed: %zd.\n", i, off); + free(spbs); + free(ptrs); + goto fail_pool; + } + } + + /* Free them all - they go to local_shard */ + for (i = 0; i < total_blocks / 2; i++) { + size_t off = ssm_pk_buff_get_idx(spbs[i]); + if (ssm_pool_remove(pool, off) != 0) { + printf("Remove %zu failed.\n", i); + free(spbs); + free(ptrs); + goto fail_pool; + } + } + + /* Freed blocks should be in shards (all blocks free again) */ + total_free = 0; + for (i = 0; i < SSM_POOL_SHARDS; i++) { + total_free += sc->shards[i].free_count; + } + + if (total_free != total_blocks) { + printf("Expected %zu free blocks total, got %zu.\n", + total_blocks, total_free); + free(spbs); + free(ptrs); + goto fail_pool; + } + + /* Allocate again - should succeed by taking from shards */ + for (i = 0; i < total_blocks / 2; i++) { + ssize_t off = ssm_pool_alloc(pool, TEST_SIZE, + &ptrs[i], &spbs[i]); + if (off < 0) { + printf("Fallback alloc %zu failed: %zd.\n", i, off); + free(spbs); + free(ptrs); + goto fail_pool; + } + } + + /* Now all allocated blocks are in use again */ + /* Cleanup - free all allocated blocks */ + for (i = 0; i < total_blocks / 2; i++) { + size_t off = ssm_pk_buff_get_idx(spbs[i]); + ssm_pool_remove(pool, off); + } + + free(spbs); + free(ptrs); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_multiprocess_sharding(void) +{ + struct ssm_pool * pool; + struct _ssm_pool_hdr * hdr; + struct _ssm_size_class * sc; + pid_t children[SSM_POOL_SHARDS]; + int i; + int status; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + /* Fork processes to test different shards */ + for (i = 0; i < SSM_POOL_SHARDS; i++) { + children[i] = fork(); + if (children[i] == -1) { + printf("Fork %d failed.\n", i); + goto fail_children; + } + + if (children[i] == 0) { + /* Child process */ + struct ssm_pool * child_pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t off; + int my_shard; + + child_pool = ssm_pool_open(); + if (child_pool == NULL) + exit(EXIT_FAILURE); + + my_shard = getpid() % SSM_POOL_SHARDS; + (void) my_shard; /* Reserved for future use */ + + /* Each child allocates and frees a block */ + off = ssm_pool_alloc(child_pool, TEST_SIZE, + &ptr, &spb); + if (off < 0) { + ssm_pool_close(child_pool); + exit(EXIT_FAILURE); + } + + /* Small delay to ensure allocation visible */ + usleep(10000); + + if (ssm_pool_remove(child_pool, off) != 0) { + ssm_pool_close(child_pool); + exit(EXIT_FAILURE); + } + + ssm_pool_close(child_pool); + exit(EXIT_SUCCESS); + } + } + + /* Wait for all children */ + for (i = 0; i < SSM_POOL_SHARDS; i++) { + if (waitpid(children[i], &status, 0) == -1) { + printf("Waitpid %d failed.\n", i); + goto fail_children; + } + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + printf("Child %d failed.\n", i); + goto fail_pool; + } + } + + /* Verify blocks distributed across shards */ + hdr = get_pool_hdr(pool); + + /* Find the first size class with blocks */ + sc = NULL; + for (i = 0; i < SSM_POOL_MAX_CLASSES; i++) { + if (hdr->size_classes[i].object_count > 0) { + sc = &hdr->size_classes[i]; + break; + } + } + + if (sc == NULL) { + printf("No size classes configured.\n"); + goto fail_pool; + } + + /* After children allocate and free, blocks should be in shards + * (though exact distribution depends on PID values) + */ + for (i = 0; i < SSM_POOL_SHARDS; i++) { + /* At least some shards should have blocks */ + if (sc->shards[i].free_count > 0) { + break; + } + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_children: + /* Kill any remaining children */ + for (i = 0; i < SSM_POOL_SHARDS; i++) { + if (children[i] > 0) + kill(children[i], SIGKILL); + } + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_exhaustion_with_fallback(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t off; + + TEST_START(); + + ssm_pool_purge(); + + pool = ssm_pool_create(); + if (pool == NULL) { + printf("Failed to create pool.\n"); + goto fail; + } + + /* Allocate until exhausted across all shards */ + while (true) { + off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb); + if (off < 0) { + if (off == -EAGAIN) + break; + printf("Unexpected error: %zd.\n", off); + goto fail_pool; + } + } + + /* Should fail with -EAGAIN when truly exhausted */ + off = ssm_pool_alloc(pool, TEST_SIZE, &ptr, &spb); + if (off != -EAGAIN) { + printf("Expected -EAGAIN, got %zd.\n", off); + goto fail_pool; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_pool: + ssm_pool_destroy(pool); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +int pool_sharding_test(int argc, + char ** argv) +{ + int ret = 0; + + (void) argc; + (void) argv; + + ret |= test_lazy_distribution(); + ret |= test_shard_migration(); + ret |= test_fallback_stealing(); + ret |= test_multiprocess_sharding(); + ret |= test_exhaustion_with_fallback(); + + return ret; +} diff --git a/src/lib/ssm/tests/pool_test.c b/src/lib/ssm/tests/pool_test.c new file mode 100644 index 00000000..e298d9ab --- /dev/null +++ b/src/lib/ssm/tests/pool_test.c @@ -0,0 +1,1038 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Test of the Secure Shared Memory (SSM) system + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#define _POSIX_C_SOURCE 200809L + +#include "config.h" +#include "ssm.h" + +#include <test/test.h> +#include <ouroboros/ssm_pool.h> +#include <ouroboros/ssm_rbuff.h> + +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <stdbool.h> +#include <stdatomic.h> +#include <sys/wait.h> +#include <sys/types.h> +#include <signal.h> +#include <time.h> + +#define POOL_256 256 +#define POOL_512 512 +#define POOL_1K 1024 +#define POOL_2K 2048 +#define POOL_4K 4096 +#define POOL_16K 16384 +#define POOL_64K 65536 +#define POOL_256K 262144 +#define POOL_1M 1048576 +#define POOL_2M (2 * 1024 * 1024) + +static int test_ssm_pool_basic_allocation(void) +{ + struct ssm_pool * pool; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Alloc failed: %zd.\n", ret); + goto fail_alloc; + } + + if (spb == NULL) { + printf("Spb is NULL.\n"); + goto fail_alloc; + } + + if (ptr == NULL) { + printf("Ptr is NULL.\n"); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb) != POOL_256) { + printf("Bad length: %zu.\n", ssm_pk_buff_len(spb)); + goto fail_alloc; + } + + ret = ssm_pool_remove(pool, ret); + if (ret != 0) { + printf("Remove failed: %zd.\n", ret); + goto fail_alloc; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_multiple_allocations(void) +{ + struct ssm_pool * pool; + uint8_t * ptr1; + uint8_t * ptr2; + uint8_t * ptr3; + struct ssm_pk_buff * spb1; + struct ssm_pk_buff * spb2; + struct ssm_pk_buff * spb3; + ssize_t ret1; + ssize_t ret2; + ssize_t ret3; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1); + ret2 = ssm_pool_alloc(pool, POOL_256, &ptr2, &spb2); + ret3 = ssm_pool_alloc(pool, POOL_256, &ptr3, &spb3); + if (ret1 < 0 || ret2 < 0 || ret3 < 0) { + printf("Allocs failed: %zd, %zd, %zd.\n", ret1, ret2, ret3); + goto fail_alloc; + } + + if (spb1 == NULL) { + printf("Spb1 is NULL.\n"); + goto fail_alloc; + } + + if (ptr1 == NULL) { + printf("Ptr1 is NULL.\n"); + goto fail_alloc; + } + + if (spb2 == NULL) { + printf("Spb2 is NULL.\n"); + goto fail_alloc; + } + + if (ptr2 == NULL) { + printf("Ptr2 is NULL.\n"); + goto fail_alloc; + } + + if (spb3 == NULL) { + printf("Spb3 is NULL.\n"); + goto fail_alloc; + } + + if (ptr3 == NULL) { + printf("Ptr3 is NULL.\n"); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb1) != POOL_256) { + printf("Bad length spb1: %zu.\n", ssm_pk_buff_len(spb1)); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb2) != POOL_256) { + printf("Bad length spb2: %zu.\n", ssm_pk_buff_len(spb2)); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb3) != POOL_256) { + printf("Bad length spb3: %zu.\n", ssm_pk_buff_len(spb3)); + goto fail_alloc; + } + + if (ssm_pool_remove(pool, ret2) != 0) { + printf("Remove ret2 failed.\n"); + goto fail_alloc; + } + + if (ssm_pool_remove(pool, ret1) != 0) { + printf("Remove ret1 failed.\n"); + goto fail_alloc; + } + + if (ssm_pool_remove(pool, ret3) != 0) { + printf("Remove ret3 failed.\n"); + goto fail_alloc; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_no_fallback_for_large(void) +{ + struct ssm_pool * pool; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_alloc(pool, POOL_2M, &ptr, &spb); + if (ret >= 0) { + printf("Oversized alloc succeeded: %zd.\n", ret); + goto fail_alloc; + } + + if (ret != -EMSGSIZE) { + printf("Wrong error: %zd.\n", ret); + goto fail_alloc; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_blocking_vs_nonblocking(void) +{ + struct ssm_pool * pool; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_alloc(pool, POOL_2M, &ptr, &spb); + if (ret != -EMSGSIZE) { + printf("Nonblocking oversized: %zd.\n", ret); + goto fail_alloc; + } + + ret = ssm_pool_alloc_b(pool, POOL_2M, &ptr, &spb, NULL); + if (ret != -EMSGSIZE) { + printf("Blocking oversized: %zd.\n", ret); + goto fail_alloc; + } + + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Valid alloc failed: %zd.\n", ret); + goto fail_alloc; + } + + ssm_pool_remove(pool, ret); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_stress_test(void) +{ + struct ssm_pool * pool; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t * indices = NULL; + ssize_t ret; + size_t count = 0; + size_t i; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + indices = malloc(100 * sizeof(*indices)); + if (indices == NULL) { + printf("Malloc failed.\n"); + goto fail_alloc; + } + + for (i = 0; i < 100; i++) { + size_t j; + size_t num; + size_t size; + + num = (i % 100) + 1; + + for (j = 0; j < num && count < 100; j++) { + switch (i % 4) { + case 0: + /* FALLTHRU */ + case 1: + size = POOL_256; + break; + case 2: + /* FALLTHRU */ + case 3: + size = POOL_1K; + break; + default: + size = POOL_256; + break; + } + + ret = ssm_pool_alloc(pool, size, &ptr, &spb); + if (ret < 0) { + printf("Alloc at iter %zu: %zd.\n", i, ret); + goto fail_test; + } + indices[count++] = ret; + } + + for (j = 0; j < count / 2; j++) { + size_t idx = j * 2; + if (idx < count) { + ret = ssm_pool_remove(pool, indices[idx]); + if (ret != 0) { + printf("Remove at iter %zu: %zd.\n", + i, ret); + goto fail_test; + } + memmove(&indices[idx], &indices[idx + 1], + (count - idx - 1) * sizeof(*indices)); + count--; + } + } + + if (i % 10 == 0) { + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Periodic alloc at %zu: %zd.\n", i, ret); + goto fail_test; + } + ssm_pool_remove(pool, ret); + } + } + + for (i = 0; i < count; i++) + ssm_pool_remove(pool, indices[i]); + + free(indices); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_test: + for (i = 0; i < count; i++) + ssm_pool_remove(pool, indices[i]); + free(indices); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_open_initializes_ssm(void) +{ + struct ssm_pool * creator; + struct ssm_pool * opener; + uint8_t * ptr; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + creator = ssm_pool_create(); + if (creator == NULL) + goto fail_create; + + ret = ssm_pool_alloc(creator, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Creator alloc failed: %zd.\n", ret); + goto fail_creator; + } + ssm_pool_remove(creator, ret); + + opener = ssm_pool_open(); + if (opener == NULL) { + printf("Open failed.\n"); + goto fail_creator; + } + + ret = ssm_pool_alloc(opener, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Opener alloc failed: %zd.\n", ret); + goto fail_opener; + } + + ssm_pool_remove(opener, ret); + ssm_pool_close(opener); + ssm_pool_destroy(creator); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_opener: + ssm_pool_close(opener); + fail_creator: + ssm_pool_destroy(creator); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_bounds_checking(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_alloc(pool, POOL_256, NULL, &spb); + if (ret < 0) { + printf("alloc failed: %zd.\n", ret); + goto fail_alloc; + } + + spb = ssm_pool_get(pool, 0); + if (spb != NULL) { + printf("Get at offset 0.\n"); + goto fail_alloc; + } + + spb = ssm_pool_get(pool, 100000000UL); + if (spb != NULL) { + printf("Get beyond pool.\n"); + goto fail_alloc; + } + + ret = ssm_pool_remove(pool, 0); + if (ret != -EINVAL) { + printf("Remove at offset 0: %zd.\n", ret); + goto fail_alloc; + } + + ret = ssm_pool_remove(pool, 100000000UL); + if (ret != -EINVAL) { + printf("Remove beyond pool: %zd.\n", ret); + goto fail_alloc; + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_inter_process_communication(void) +{ + struct ssm_pool * pool; + struct ssm_rbuff * rb; + struct ssm_pk_buff * spb; + uint8_t * ptr; + uint8_t * data; + const char * msg = "inter-process test"; + size_t len; + ssize_t idx; + pid_t pid; + int status; + + TEST_START(); + + len = strlen(msg) + 1; + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + rb = ssm_rbuff_create(getpid(), 1); + if (rb == NULL) { + printf("Rbuff create failed.\n"); + goto fail_pool; + } + + pid = fork(); + if (pid < 0) { + printf("Fork failed.\n"); + goto fail_rbuff; + } + + if (pid == 0) { + idx = ssm_rbuff_read_b(rb, NULL); + if (idx < 0) { + printf("Child: rbuff read: %zd.\n", idx); + exit(1); + } + + spb = ssm_pool_get(pool, idx); + if (spb == NULL) { + printf("Child: pool get failed.\n"); + exit(1); + } + + data = ssm_pk_buff_head(spb); + if (data == NULL) { + printf("Child: data is NULL.\n"); + ssm_pool_remove(pool, idx); + exit(1); + } + + if (strcmp((char *)data, msg) != 0) { + printf("Child: data mismatch.\n"); + ssm_pool_remove(pool, idx); + exit(1); + } + + ssm_pool_remove(pool, idx); + exit(0); + } + + idx = ssm_pool_alloc(pool, len, &ptr, &spb); + if (idx < 0) { + printf("Parent: pool alloc: %zd.\n", idx); + goto fail_child; + } + + memcpy(ptr, msg, len); + + if (ssm_rbuff_write(rb, idx) < 0) { + printf("Parent: rbuff write failed.\n"); + ssm_pool_remove(pool, idx); + goto fail_child; + } + + if (waitpid(pid, &status, 0) < 0) { + printf("Parent: waitpid failed.\n"); + ssm_pool_remove(pool, idx); + goto fail_rbuff; + } + + if (!WIFEXITED(status) || WEXITSTATUS(status) != 0) { + printf("Child failed.\n"); + ssm_pool_remove(pool, idx); + goto fail_rbuff; + } + + ssm_rbuff_destroy(rb); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_child: + waitpid(pid, &status, 0); + fail_rbuff: + ssm_rbuff_destroy(rb); + fail_pool: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_read_operation(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * wptr; + uint8_t * rptr; + const char * data = "ssm_pool_read test"; + size_t len; + ssize_t idx; + ssize_t ret; + + TEST_START(); + + len = strlen(data) + 1; + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + idx = ssm_pool_alloc(pool, len, &wptr, &spb); + if (idx < 0) { + printf("alloc failed: %zd.\n", idx); + goto fail_alloc; + } + + memcpy(wptr, data, len); + + ret = ssm_pool_read(&rptr, pool, idx); + if (ret < 0) { + printf("Read failed: %zd.\n", ret); + goto fail_read; + } + + if (rptr == NULL) { + printf("NULL pointer.\n"); + goto fail_read; + } + + if (strcmp((char *)rptr, data) != 0) { + printf("Data mismatch.\n"); + goto fail_read; + } + + ssm_pool_remove(pool, idx); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_read: + ssm_pool_remove(pool, idx); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_mlock_operation(void) +{ + struct ssm_pool * pool; + int ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + ret = ssm_pool_mlock(pool); + if (ret < 0) + printf("Mlock failed: %d (may need privileges).\n", ret); + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pk_buff_operations(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + uint8_t * head; + uint8_t * tail; + const char * data = "packet buffer test"; + size_t dlen; + size_t len; + ssize_t idx; + + TEST_START(); + + dlen = strlen(data); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + idx = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (idx < 0) { + printf("alloc failed: %zd.\n", idx); + goto fail_alloc; + } + + head = ssm_pk_buff_head(spb); + if (head != ptr) { + printf("Head mismatch.\n"); + goto fail_ops; + } + + len = ssm_pk_buff_len(spb); + if (len != POOL_256) { + printf("Bad length: %zu.\n", len); + goto fail_ops; + } + + tail = ssm_pk_buff_tail(spb); + if (tail != ptr + len) { + printf("Tail mismatch.\n"); + goto fail_ops; + } + + memcpy(head, data, dlen); + + tail = ssm_pk_buff_tail_alloc(spb, 32); + if (tail == NULL) { + printf("Tail_alloc failed.\n"); + goto fail_ops; + } + + if (ssm_pk_buff_len(spb) != POOL_256 + 32) { + printf("Length after tail_alloc: %zu.\n", + ssm_pk_buff_len(spb)); + goto fail_ops; + } + + if (memcmp(head, data, dlen) != 0) { + printf("Data corrupted.\n"); + goto fail_ops; + } + + tail = ssm_pk_buff_tail_release(spb, 32); + if (tail == NULL) { + printf("Tail_release failed.\n"); + goto fail_ops; + } + + if (ssm_pk_buff_len(spb) != POOL_256) { + printf("Length after tail_release: %zu.\n", + ssm_pk_buff_len(spb)); + goto fail_ops; + } + + ssm_pool_remove(pool, idx); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_ops: + ssm_pool_remove(pool, idx); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +#define OVERHEAD (offsetof(struct ssm_pk_buff, data) + \ + SSM_PK_BUFF_HEADSPACE + SSM_PK_BUFF_TAILSPACE) +static int test_ssm_pool_size_class_boundaries(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + size_t sizes[] = { + 1, + POOL_512 - OVERHEAD, + POOL_512 - OVERHEAD + 1, + POOL_1K - OVERHEAD, + POOL_1K - OVERHEAD + 1, + POOL_2K - OVERHEAD, + POOL_2K - OVERHEAD + 1, + POOL_4K - OVERHEAD, + POOL_4K - OVERHEAD + 1, + POOL_16K - OVERHEAD, + POOL_16K - OVERHEAD + 1, + POOL_64K - OVERHEAD, + POOL_64K - OVERHEAD + 1, + POOL_256K - OVERHEAD, + POOL_256K - OVERHEAD + 1, + POOL_1M - OVERHEAD, + }; + size_t expected_classes[] = { + 512, 512, 1024, 1024, 2048, 2048, 4096, 4096, 16384, + 16384, 65536, 65536, 262144, 262144, 1048576, 1048576 + }; + size_t i; + ssize_t idx; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + for (i = 0; i < sizeof(sizes) / sizeof(sizes[0]); i++) { + struct ssm_pk_buff * hdr; + size_t actual_class; + + idx = ssm_pool_alloc(pool, sizes[i], &ptr, &spb); + if (idx < 0) { + printf("Alloc at %zu failed: %zd.\n", sizes[i], idx); + goto fail_alloc; + } + + if (ssm_pk_buff_len(spb) != sizes[i]) { + printf("Length mismatch at %zu: %zu.\n", + sizes[i], ssm_pk_buff_len(spb)); + ssm_pool_remove(pool, idx); + goto fail_alloc; + } + + /* Verify correct size class was used + * hdr->size is the data array size (object_size - header) */ + hdr = spb; + actual_class = hdr->size + offsetof(struct ssm_pk_buff, data); + if (actual_class != expected_classes[i]) { + printf("Wrong class for len=%zu: want %zu, got %zu.\n", + sizes[i], expected_classes[i], actual_class); + ssm_pool_remove(pool, idx); + goto fail_alloc; + } + + memset(ptr, i & 0xFF, sizes[i]); + + ssm_pool_remove(pool, idx); + } + + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_exhaustion(void) +{ + struct ssm_pool * pool; + struct ssm_pk_buff * spb; + uint8_t * ptr; + ssize_t * indices; + size_t count = 0; + size_t i; + ssize_t ret; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + indices = malloc(2048 * sizeof(*indices)); + if (indices == NULL) { + printf("Malloc failed.\n"); + goto fail_alloc; + } + + for (i = 0; i < 2048; i++) { + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + if (ret == -EAGAIN) + break; + printf("Alloc error: %zd.\n", ret); + goto fail_test; + } + indices[count++] = ret; + } + + if (count == 0) { + printf("No allocs succeeded.\n"); + goto fail_test; + } + + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret >= 0) { + ssm_pool_remove(pool, ret); + } else if (ret != -EAGAIN) { + printf("Unexpected error: %zd.\n", ret); + goto fail_test; + } + + for (i = 0; i < count; i++) + ssm_pool_remove(pool, indices[i]); + + ret = ssm_pool_alloc(pool, POOL_256, &ptr, &spb); + if (ret < 0) { + printf("Alloc after free failed: %zd.\n", ret); + goto fail_test; + } + ssm_pool_remove(pool, ret); + + free(indices); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_test: + for (i = 0; i < count; i++) + ssm_pool_remove(pool, indices[i]); + free(indices); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_pool_reclaim_orphans(void) +{ + struct ssm_pool * pool; + uint8_t * ptr1; + uint8_t * ptr2; + uint8_t * ptr3; + struct ssm_pk_buff * spb1; + struct ssm_pk_buff * spb2; + struct ssm_pk_buff * spb3; + ssize_t ret1; + ssize_t ret2; + ssize_t ret3; + pid_t my_pid; + pid_t fake_pid = 99999; + + TEST_START(); + + pool = ssm_pool_create(); + if (pool == NULL) + goto fail_create; + + my_pid = getpid(); + + /* Allocate some blocks */ + ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1); + ret2 = ssm_pool_alloc(pool, POOL_512, &ptr2, &spb2); + ret3 = ssm_pool_alloc(pool, POOL_1K, &ptr3, &spb3); + if (ret1 < 0 || ret2 < 0 || ret3 < 0) { + printf("Allocs failed: %zd, %zd, %zd.\n", ret1, ret2, ret3); + goto fail_alloc; + } + + /* Simulate blocks from another process by changing allocator_pid */ + spb1->allocator_pid = fake_pid; + spb2->allocator_pid = fake_pid; + /* Keep spb3 with our pid */ + + /* Reclaim orphans from fake_pid */ + ssm_pool_reclaim_orphans(pool, fake_pid); + + /* Verify spb1 and spb2 have refcount 0 (reclaimed) */ + if (spb1->refcount != 0) { + printf("spb1 refcount should be 0, got %u.\n", spb1->refcount); + goto fail_test; + } + + if (spb2->refcount != 0) { + printf("spb2 refcount should be 0, got %u.\n", spb2->refcount); + goto fail_test; + } + + /* Verify spb3 still has refcount 1 (not reclaimed) */ + if (spb3->refcount != 1) { + printf("spb3 refcount should be 1, got %u.\n", spb3->refcount); + goto fail_test; + } + + /* Clean up */ + ssm_pool_remove(pool, ret3); + + /* Try allocating again - should get blocks from reclaimed pool */ + ret1 = ssm_pool_alloc(pool, POOL_256, &ptr1, &spb1); + if (ret1 < 0) { + printf("Alloc after reclaim failed: %zd.\n", ret1); + goto fail_test; + } + + /* Verify new allocation has our pid */ + if (spb1->allocator_pid != my_pid) { + printf("New block has wrong pid: %d vs %d.\n", + spb1->allocator_pid, my_pid); + goto fail_test; + } + + ssm_pool_remove(pool, ret1); + ssm_pool_destroy(pool); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_test: + ssm_pool_remove(pool, ret3); + fail_alloc: + ssm_pool_destroy(pool); + fail_create: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +int pool_test(int argc, + char ** argv) +{ + int ret = 0; + + (void) argc; + (void) argv; + + ssm_pool_purge(); + + ret |= test_ssm_pool_basic_allocation(); + ret |= test_ssm_pool_multiple_allocations(); + ret |= test_ssm_pool_no_fallback_for_large(); + ret |= test_ssm_pool_blocking_vs_nonblocking(); + ret |= test_ssm_pool_stress_test(); + ret |= test_ssm_pool_open_initializes_ssm(); + ret |= test_ssm_pool_bounds_checking(); + ret |= test_ssm_pool_inter_process_communication(); + ret |= test_ssm_pool_read_operation(); + ret |= test_ssm_pool_mlock_operation(); + ret |= test_ssm_pk_buff_operations(); + ret |= test_ssm_pool_size_class_boundaries(); + ret |= test_ssm_pool_exhaustion(); + ret |= test_ssm_pool_reclaim_orphans(); + + return ret; +} diff --git a/src/lib/ssm/tests/rbuff_test.c b/src/lib/ssm/tests/rbuff_test.c new file mode 100644 index 00000000..6e1cb5ec --- /dev/null +++ b/src/lib/ssm/tests/rbuff_test.c @@ -0,0 +1,675 @@ +/* + * Ouroboros - Copyright (C) 2016 - 2024 + * + * Test of the SSM notification ring buffer + * + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 as + * published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., http://www.fsf.org/about/contact/. + */ + +#if defined(__linux__) || defined(__CYGWIN__) +#define _DEFAULT_SOURCE +#else +#define _POSIX_C_SOURCE 200112L +#endif + +#include "config.h" +#include "ssm.h" + +#include <test/test.h> +#include <ouroboros/ssm_rbuff.h> +#include <ouroboros/errno.h> +#include <ouroboros/time.h> + +#include <errno.h> +#include <stdio.h> +#include <unistd.h> +#include <pthread.h> + +static int test_ssm_rbuff_create_destroy(void) +{ + struct ssm_rbuff * rb; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 1); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_write_read(void) +{ + struct ssm_rbuff * rb; + ssize_t idx; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 2); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + if (ssm_rbuff_write(rb, 42) < 0) { + printf("Failed to write value.\n"); + goto fail_rb; + } + + if (ssm_rbuff_queued(rb) != 1) { + printf("Queue length should be 1, got %zu.\n", + ssm_rbuff_queued(rb)); + goto fail_rb; + } + + idx = ssm_rbuff_read(rb); + if (idx != 42) { + printf("Expected 42, got %zd.\n", idx); + goto fail_rb; + } + + if (ssm_rbuff_queued(rb) != 0) { + printf("Queue should be empty, got %zu.\n", + ssm_rbuff_queued(rb)); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_read_empty(void) +{ + struct ssm_rbuff * rb; + ssize_t ret; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 3); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + ret = ssm_rbuff_read(rb); + if (ret != -EAGAIN) { + printf("Expected -EAGAIN, got %zd.\n", ret); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_fill_drain(void) +{ + struct ssm_rbuff * rb; + size_t i; + ssize_t ret; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 4); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { + if (ssm_rbuff_queued(rb) != i) { + printf("Expected %zu queued, got %zu.\n", + i, ssm_rbuff_queued(rb)); + goto fail_rb; + } + if (ssm_rbuff_write(rb, i) < 0) { + printf("Failed to write at index %zu.\n", i); + goto fail_rb; + } + } + + if (ssm_rbuff_queued(rb) != SSM_RBUFF_SIZE - 1) { + printf("Expected %d queued, got %zu.\n", + SSM_RBUFF_SIZE - 1, ssm_rbuff_queued(rb)); + goto fail_rb; + } + + ret = ssm_rbuff_write(rb, 999); + if (ret != -EAGAIN) { + printf("Expected -EAGAIN on full buffer, got %zd.\n", ret); + goto fail_rb; + } + + for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { + ret = ssm_rbuff_read(rb); + if (ret != (ssize_t) i) { + printf("Expected %zu, got %zd.\n", i, ret); + goto fail_rb; + } + } + + if (ssm_rbuff_queued(rb) != 0) { + printf("Expected empty queue, got %zu.\n", + ssm_rbuff_queued(rb)); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + while (ssm_rbuff_read(rb) >= 0) + ; + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_acl(void) +{ + struct ssm_rbuff * rb; + uint32_t acl; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 5); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + acl = ssm_rbuff_get_acl(rb); + if (acl != ACL_RDWR) { + printf("Expected ACL_RDWR, got %u.\n", acl); + goto fail_rb; + } + + ssm_rbuff_set_acl(rb, ACL_RDONLY); + acl = ssm_rbuff_get_acl(rb); + if (acl != ACL_RDONLY) { + printf("Expected ACL_RDONLY, got %u.\n", acl); + goto fail_rb; + } + + if (ssm_rbuff_write(rb, 1) != -ENOTALLOC) { + printf("Expected -ENOTALLOC on RDONLY.\n"); + goto fail_rb; + } + + ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + if (ssm_rbuff_write(rb, 1) != -EFLOWDOWN) { + printf("Expected -EFLOWDOWN on FLOWDOWN.\n"); + goto fail_rb; + } + + if (ssm_rbuff_read(rb) != -EFLOWDOWN) { + printf("Expected -EFLOWDOWN on read with FLOWDOWN.\n"); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_open_close(void) +{ + struct ssm_rbuff * rb1; + struct ssm_rbuff * rb2; + pid_t pid; + + TEST_START(); + + pid = getpid(); + + rb1 = ssm_rbuff_create(pid, 6); + if (rb1 == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + if (ssm_rbuff_write(rb1, 123) < 0) { + printf("Failed to write value.\n"); + goto fail_rb1; + } + + rb2 = ssm_rbuff_open(pid, 6); + if (rb2 == NULL) { + printf("Failed to open existing rbuff.\n"); + goto fail_rb1; + } + + if (ssm_rbuff_queued(rb2) != 1) { + printf("Expected 1 queued in opened rbuff, got %zu.\n", + ssm_rbuff_queued(rb2)); + goto fail_rb2; + } + + if (ssm_rbuff_read(rb2) != 123) { + printf("Failed to read from opened rbuff.\n"); + goto fail_rb2; + } + + ssm_rbuff_close(rb2); + ssm_rbuff_destroy(rb1); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb2: + ssm_rbuff_close(rb2); + fail_rb1: + ssm_rbuff_destroy(rb1); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +struct thread_args { + struct ssm_rbuff * rb; + int iterations; + int delay_us; +}; + +static void * writer_thread(void * arg) +{ + struct thread_args * args = (struct thread_args *) arg; + struct timespec delay = {0, 0}; + int i; + + delay.tv_nsec = args->delay_us * 1000L; + + for (i = 0; i < args->iterations; ++i) { + while (ssm_rbuff_write(args->rb, i) < 0) + nanosleep(&delay, NULL); + } + + return NULL; +} + +static void * reader_thread(void * arg) +{ + struct thread_args * args = (struct thread_args *) arg; + struct timespec delay = {0, 0}; + int i; + ssize_t val; + + delay.tv_nsec = args->delay_us * 1000L; + + for (i = 0; i < args->iterations; ++i) { + val = ssm_rbuff_read(args->rb); + while (val < 0) { + nanosleep(&delay, NULL); + val = ssm_rbuff_read(args->rb); + } + if (val != i) { + printf("Expected %d, got %zd.\n", i, val); + return (void *) -1; + } + } + + return NULL; +} + +static void * blocking_writer_thread(void * arg) +{ + struct thread_args * args = (struct thread_args *) arg; + int i; + + for (i = 0; i < args->iterations; ++i) { + if (ssm_rbuff_write_b(args->rb, i, NULL) < 0) + return (void *) -1; + } + + return NULL; +} + +static void * blocking_reader_thread(void * arg) +{ + struct thread_args * args = (struct thread_args *) arg; + int i; + ssize_t val; + + for (i = 0; i < args->iterations; ++i) { + val = ssm_rbuff_read_b(args->rb, NULL); + if (val < 0 || val != i) { + printf("Expected %d, got %zd.\n", i, val); + return (void *) -1; + } + } + + return NULL; +} + +static int test_ssm_rbuff_blocking(void) +{ + struct ssm_rbuff * rb; + pthread_t wthread; + pthread_t rthread; + struct thread_args args; + struct timespec delay = {0, 10 * MILLION}; + void * ret_w; + void * ret_r; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 8); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + args.rb = rb; + args.iterations = 50; + args.delay_us = 0; + + if (pthread_create(&rthread, NULL, blocking_reader_thread, &args)) { + printf("Failed to create reader thread.\n"); + goto fail_rthread; + } + + nanosleep(&delay, NULL); + + if (pthread_create(&wthread, NULL, blocking_writer_thread, &args)) { + printf("Failed to create writer thread.\n"); + pthread_cancel(rthread); + goto fail_wthread; + } + + pthread_join(wthread, &ret_w); + pthread_join(rthread, &ret_r); + + if (ret_w != NULL || ret_r != NULL) { + printf("Thread returned error.\n"); + goto fail_ret; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_ret: + fail_wthread: + pthread_join(rthread, NULL); + fail_rthread: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_blocking_timeout(void) +{ + struct ssm_rbuff * rb; + struct timespec abs_timeout; + struct timespec interval = {0, 100 * MILLION}; + struct timespec start; + struct timespec end; + ssize_t ret; + long elapsed_ms; + size_t i; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 9); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + clock_gettime(PTHREAD_COND_CLOCK, &start); + ts_add(&start, &interval, &abs_timeout); + + ret = ssm_rbuff_read_b(rb, &abs_timeout); + + clock_gettime(PTHREAD_COND_CLOCK, &end); + + if (ret != -ETIMEDOUT) { + printf("Expected -ETIMEDOUT, got %zd.\n", ret); + goto fail_rb; + } + + elapsed_ms = (end.tv_sec - start.tv_sec) * 1000L + + (end.tv_nsec - start.tv_nsec) / 1000000L; + + if (elapsed_ms < 90 || elapsed_ms > 200) { + printf("Timeout took %ld ms, expected ~100 ms.\n", + elapsed_ms); + goto fail_rb; + } + + for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { + if (ssm_rbuff_write(rb, i) < 0) { + printf("Failed to fill buffer.\n"); + goto fail_rb; + } + } + + clock_gettime(PTHREAD_COND_CLOCK, &start); + ts_add(&start, &interval, &abs_timeout); + + ret = ssm_rbuff_write_b(rb, 999, &abs_timeout); + + clock_gettime(PTHREAD_COND_CLOCK, &end); + + if (ret != -ETIMEDOUT) { + printf("Expected -ETIMEDOUT on full buffer, got %zd.\n", + ret); + goto fail_rb; + } + + elapsed_ms = (end.tv_sec - start.tv_sec) * 1000L + + (end.tv_nsec - start.tv_nsec) / 1000000L; + + if (elapsed_ms < 90 || elapsed_ms > 200) { + printf("Write timeout took %ld ms, expected ~100 ms.\n", + elapsed_ms); + goto fail_rb; + } + + while (ssm_rbuff_read(rb) >= 0) + ; + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + while (ssm_rbuff_read(rb) >= 0) + ; + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_blocking_flowdown(void) +{ + struct ssm_rbuff * rb; + struct timespec abs_timeout; + struct timespec now; + struct timespec interval = {5, 0}; + ssize_t ret; + size_t i; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 10); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &interval, &abs_timeout); + + ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + + ret = ssm_rbuff_read_b(rb, &abs_timeout); + if (ret != -EFLOWDOWN) { + printf("Expected -EFLOWDOWN, got %zd.\n", ret); + goto fail_rb; + } + + ssm_rbuff_set_acl(rb, ACL_RDWR); + + for (i = 0; i < SSM_RBUFF_SIZE - 1; ++i) { + if (ssm_rbuff_write(rb, i) < 0) { + printf("Failed to fill buffer.\n"); + goto fail_rb; + } + } + + clock_gettime(PTHREAD_COND_CLOCK, &now); + ts_add(&now, &interval, &abs_timeout); + + ssm_rbuff_set_acl(rb, ACL_FLOWDOWN); + + ret = ssm_rbuff_write_b(rb, 999, &abs_timeout); + if (ret != -EFLOWDOWN) { + printf("Expected -EFLOWDOWN on write, got %zd.\n", ret); + goto fail_rb; + } + + ssm_rbuff_set_acl(rb, ACL_RDWR); + while (ssm_rbuff_read(rb) >= 0) + ; + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + while (ssm_rbuff_read(rb) >= 0) + ; + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +static int test_ssm_rbuff_threaded(void) +{ + struct ssm_rbuff * rb; + pthread_t wthread; + pthread_t rthread; + struct thread_args args; + void * ret_w; + void * ret_r; + + TEST_START(); + + rb = ssm_rbuff_create(getpid(), 7); + if (rb == NULL) { + printf("Failed to create rbuff.\n"); + goto fail; + } + + args.rb = rb; + args.iterations = 100; + args.delay_us = 100; + + if (pthread_create(&wthread, NULL, writer_thread, &args)) { + printf("Failed to create writer thread.\n"); + goto fail_rb; + } + + if (pthread_create(&rthread, NULL, reader_thread, &args)) { + printf("Failed to create reader thread.\n"); + pthread_cancel(wthread); + pthread_join(wthread, NULL); + goto fail_rb; + } + + pthread_join(wthread, &ret_w); + pthread_join(rthread, &ret_r); + + if (ret_w != NULL || ret_r != NULL) { + printf("Thread returned error.\n"); + goto fail_rb; + } + + ssm_rbuff_destroy(rb); + + TEST_SUCCESS(); + return TEST_RC_SUCCESS; + + fail_rb: + ssm_rbuff_destroy(rb); + fail: + TEST_FAIL(); + return TEST_RC_FAIL; +} + +int rbuff_test(int argc, + char ** argv) +{ + int ret = 0; + + (void) argc; + (void) argv; + + ret |= test_ssm_rbuff_create_destroy(); + ret |= test_ssm_rbuff_write_read(); + ret |= test_ssm_rbuff_read_empty(); + ret |= test_ssm_rbuff_fill_drain(); + ret |= test_ssm_rbuff_acl(); + ret |= test_ssm_rbuff_open_close(); + ret |= test_ssm_rbuff_threaded(); + ret |= test_ssm_rbuff_blocking(); + ret |= test_ssm_rbuff_blocking_timeout(); + ret |= test_ssm_rbuff_blocking_flowdown(); + + return ret; +} |
