diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2025-12-24 10:23:02 +0100 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-01-07 14:33:39 +0100 |
| commit | cef9a910aa9c2315aa4d4ab6c196b078ca2fad90 (patch) | |
| tree | c72302670b228688a3293f5a9c55d0597479b76b /src/lib/shm_rbuff.c | |
| parent | 48c294105f5123dc876fbad199ec1e0166d82a18 (diff) | |
| download | ouroboros-cef9a910aa9c2315aa4d4ab6c196b078ca2fad90.tar.gz ouroboros-cef9a910aa9c2315aa4d4ab6c196b078ca2fad90.zip | |
lib: Remove the not-so lockless rbuff_ll
The "lockless" rbuff was mixing paradigms as it still has mutexes and
condvars to avoid spinning on blocking behaviour. This was a bad
idea. We'll add proper lockless implementations later.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
Diffstat (limited to 'src/lib/shm_rbuff.c')
| -rw-r--r-- | src/lib/shm_rbuff.c | 281 |
1 files changed, 277 insertions, 4 deletions
diff --git a/src/lib/shm_rbuff.c b/src/lib/shm_rbuff.c index 22cff41c..bee52b4e 100644 --- a/src/lib/shm_rbuff.c +++ b/src/lib/shm_rbuff.c @@ -194,6 +194,29 @@ struct shm_rbuff * shm_rbuff_create(pid_t pid, return NULL; } +void shm_rbuff_destroy(struct shm_rbuff * rb) +{ + char fn[FN_MAX_CHARS]; + + assert(rb != NULL); + +#ifdef CONFIG_OUROBOROS_DEBUG + pthread_mutex_lock(rb->lock); + + *rb->acl = *rb->acl & ACL_FLOWDOWN; + + pthread_cond_broadcast(rb->add); + pthread_cond_broadcast(rb->del); + + pthread_mutex_unlock(rb->lock); +#endif + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); + + shm_rbuff_close(rb); + + shm_unlink(fn); +} + struct shm_rbuff * shm_rbuff_open(pid_t pid, int flow_id) { @@ -207,9 +230,259 @@ void shm_rbuff_close(struct shm_rbuff * rb) rbuff_destroy(rb); } -#if (defined(SHM_RBUFF_LOCKLESS) && \ - (defined(__GNUC__) || defined (__clang__))) -#include "shm_rbuff_ll.c" +int shm_rbuff_write(struct shm_rbuff * rb, + size_t idx) +{ + int ret = 0; + + assert(rb != NULL); + assert(idx < SHM_BUFFER_SIZE); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; + } + + if (!shm_rbuff_free(rb)) { + ret = -EAGAIN; + goto err; + } + + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + + *head_el_ptr(rb) = (ssize_t) idx; + *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); + + pthread_mutex_unlock(rb->lock); + + return 0; + err: + pthread_mutex_unlock(rb->lock); + return ret; +} + +int shm_rbuff_write_b(struct shm_rbuff * rb, + size_t idx, + const struct timespec * abstime) +{ + int ret = 0; + + assert(rb != NULL); + assert(idx < SHM_BUFFER_SIZE); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); + + while (!shm_rbuff_free(rb) + && ret != -ETIMEDOUT + && !(*rb->acl & ACL_FLOWDOWN)) { + ret = -__timedwait(rb->del, rb->lock, abstime); +#ifdef HAVE_ROBUST_MUTEX + if (ret == -EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + } + + if (ret != -ETIMEDOUT) { + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + *head_el_ptr(rb) = (ssize_t) idx; + *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); + } + + pthread_cleanup_pop(true); + + return ret; + err: + pthread_mutex_unlock(rb->lock); + return ret; +} + +static int check_rb_acl(struct shm_rbuff * rb) +{ + assert(rb != NULL); + + if (*rb->acl & ACL_FLOWDOWN) + return -EFLOWDOWN; + + if (*rb->acl & ACL_FLOWPEER) + return -EFLOWPEER; + + return -EAGAIN; +} + +ssize_t shm_rbuff_read(struct shm_rbuff * rb) +{ + ssize_t ret = 0; + + assert(rb != NULL); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + if (shm_rbuff_empty(rb)) { + ret = check_rb_acl(rb); + pthread_mutex_unlock(rb->lock); + return ret; + } + + ret = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); + pthread_cond_broadcast(rb->del); + + pthread_mutex_unlock(rb->lock); + + return ret; +} + +ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, + const struct timespec * abstime) +{ + ssize_t idx = -1; + + assert(rb != NULL); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) { + pthread_mutex_unlock(rb->lock); + return -EFLOWDOWN; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); + + while (shm_rbuff_empty(rb) && + idx != -ETIMEDOUT && + check_rb_acl(rb) == -EAGAIN) { + idx = -__timedwait(rb->add, rb->lock, abstime); +#ifdef HAVE_ROBUST_MUTEX + if (idx == -EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + } + + if (!shm_rbuff_empty(rb)) { + idx = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); + pthread_cond_broadcast(rb->del); + } else if (idx != -ETIMEDOUT) { + idx = check_rb_acl(rb); + } + + pthread_cleanup_pop(true); + + assert(idx != -EAGAIN); + + return idx; +} + +void shm_rbuff_set_acl(struct shm_rbuff * rb, + uint32_t flags) +{ + assert(rb != NULL); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + *rb->acl = (size_t) flags; + + pthread_mutex_unlock(rb->lock); +} + +uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) +{ + uint32_t flags; + + assert(rb != NULL); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + flags = (uint32_t) *rb->acl; + + pthread_mutex_unlock(rb->lock); + + return flags; +} + +void shm_rbuff_fini(struct shm_rbuff * rb) +{ + assert(rb != NULL); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); + + while (!shm_rbuff_empty(rb)) +#ifndef HAVE_ROBUST_MUTEX + pthread_cond_wait(rb->add, rb->lock); +#else + if (pthread_cond_wait(rb->add, rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + pthread_cleanup_pop(true); +} + +size_t shm_rbuff_queued(struct shm_rbuff * rb) +{ + size_t ret; + + assert(rb != NULL); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); #else -#include "shm_rbuff_pthr.c" + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); #endif + + ret = shm_rbuff_used(rb); + + pthread_mutex_unlock(rb->lock); + + return ret; +} |
