diff options
Diffstat (limited to 'src/lib/shm_rdrbuff.c')
| -rw-r--r-- | src/lib/shm_rdrbuff.c | 266 |
1 files changed, 110 insertions, 156 deletions
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c index 12e29bef..7ad1bd2e 100644 --- a/src/lib/shm_rdrbuff.c +++ b/src/lib/shm_rdrbuff.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2018 + * Ouroboros - Copyright (C) 2016 - 2024 * * Random Deletion Ring Buffer for Data Units * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -25,53 +25,37 @@ #include "config.h" #include <ouroboros/errno.h> +#include <ouroboros/pthread.h> #include <ouroboros/shm_rdrbuff.h> -#include <ouroboros/shm_du_buff.h> -#include <ouroboros/time_utils.h> -#include <pthread.h> -#include <sys/mman.h> +#include <assert.h> #include <fcntl.h> -#include <unistd.h> +#include <signal.h> +#include <stdbool.h> +#include <stdio.h> #include <stdlib.h> #include <string.h> -#include <stdio.h> -#include <signal.h> +#include <unistd.h> +#include <sys/mman.h> #include <sys/stat.h> -#include <stdbool.h> -#include <assert.h> #define SHM_BLOCKS_SIZE ((SHM_BUFFER_SIZE) * SHM_RDRB_BLOCK_SIZE) #define SHM_FILE_SIZE (SHM_BLOCKS_SIZE + 2 * sizeof(size_t) \ + sizeof(pthread_mutex_t) + 2 * sizeof(pthread_cond_t) \ + sizeof(pid_t)) - -#ifndef SHM_RDRB_MULTI_BLOCK -#define WAIT_BLOCKS 1 -#else -#define WAIT_BLOCKS ((SHM_BUFFER_SIZE) >> 4) -#if WAIT_BLOCKS == 0 -#undef WAIT_BLOCKS -#define WAIT_BLOCKS 1 -#endif -#endif +#define DU_BUFF_OVERHEAD (DU_BUFF_HEADSPACE + DU_BUFF_TAILSPACE) #define get_head_ptr(rdrb) \ - ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->head \ - * SHM_RDRB_BLOCK_SIZE))) + idx_to_du_buff_ptr(rdrb, *rdrb->head) #define get_tail_ptr(rdrb) \ - ((struct shm_du_buff *) (rdrb->shm_base + (*rdrb->tail \ - * SHM_RDRB_BLOCK_SIZE))) + idx_to_du_buff_ptr(rdrb, *rdrb->tail) #define idx_to_du_buff_ptr(rdrb, idx) \ ((struct shm_du_buff *) (rdrb->shm_base + idx * SHM_RDRB_BLOCK_SIZE)) -#define block_ptr_to_idx(rdrb, sdb) \ - (((uint8_t *)sdb - rdrb->shm_base) / SHM_RDRB_BLOCK_SIZE) - #define shm_rdrb_used(rdrb) \ - ((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) \ + (((*rdrb->head + (SHM_BUFFER_SIZE) - *rdrb->tail) + 1) \ & ((SHM_BUFFER_SIZE) - 1)) #define shm_rdrb_free(rdrb, i) \ @@ -80,11 +64,6 @@ #define shm_rdrb_empty(rdrb) \ (*rdrb->tail == *rdrb->head) -enum shm_du_buff_flags { - SDB_VALID = 0, - SDB_NULL -}; - struct shm_du_buff { size_t size; #ifdef SHM_RDRB_MULTI_BLOCK @@ -92,7 +71,7 @@ struct shm_du_buff { #endif size_t du_head; size_t du_tail; - size_t flags; + size_t refs; size_t idx; }; @@ -101,8 +80,7 @@ struct shm_rdrbuff { size_t * head; /* start of ringbuffer head */ size_t * tail; /* start of ringbuffer tail */ pthread_mutex_t * lock; /* lock all free space in shm */ - pthread_cond_t * full; /* flag when full */ - pthread_cond_t * healthy; /* flag when SDU is read */ + pthread_cond_t * healthy; /* flag when packet is read */ pid_t * pid; /* pid of the irmd owner */ }; @@ -111,16 +89,25 @@ static void garbage_collect(struct shm_rdrbuff * rdrb) #ifdef SHM_RDRB_MULTI_BLOCK struct shm_du_buff * sdb; while (!shm_rdrb_empty(rdrb) && - (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL) + (sdb = get_tail_ptr(rdrb))->refs == 0) *rdrb->tail = (*rdrb->tail + sdb->blocks) & ((SHM_BUFFER_SIZE) - 1); #else - while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL) + while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0) *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); #endif pthread_cond_broadcast(rdrb->healthy); } +#ifdef HAVE_ROBUST_MUTEX +static void sanitize(struct shm_rdrbuff * rdrb) +{ + --get_head_ptr(rdrb)->refs; + garbage_collect(rdrb); + pthread_mutex_consistent(rdrb->lock); +} +#endif + static char * rdrb_filename(void) { char * str; @@ -148,8 +135,10 @@ void shm_rdrbuff_destroy(struct shm_rdrbuff * rdrb) assert(rdrb); - if (getpid() != *rdrb->pid && kill(*rdrb->pid, 0) == 0) + if (getpid() != *rdrb->pid && kill(*rdrb->pid, 0) == 0) { + free(rdrb); return; + } shm_rdrbuff_close(rdrb); @@ -182,7 +171,7 @@ static struct shm_rdrbuff * rdrb_create(int flags) if (fd == -1) goto fail_open; - if ((flags & O_CREAT) && ftruncate(fd, SHM_FILE_SIZE - 1) < 0) + if ((flags & O_CREAT) && ftruncate(fd, SHM_FILE_SIZE) < 0) goto fail_truncate; shm_base = mmap(NULL, SHM_FILE_SIZE, MM_FLAGS, MAP_SHARED, fd, 0); @@ -195,8 +184,7 @@ static struct shm_rdrbuff * rdrb_create(int flags) rdrb->head = (size_t *) ((uint8_t *) rdrb->shm_base + SHM_BLOCKS_SIZE); rdrb->tail = rdrb->head + 1; rdrb->lock = (pthread_mutex_t *) (rdrb->tail + 1); - rdrb->full = (pthread_cond_t *) (rdrb->lock + 1); - rdrb->healthy = rdrb->full + 1; + rdrb->healthy = (pthread_cond_t *) (rdrb->lock + 1); rdrb->pid = (pid_t *) (rdrb->healthy + 1); free(shm_rdrb_fn); @@ -215,7 +203,7 @@ static struct shm_rdrbuff * rdrb_create(int flags) return NULL; } -struct shm_rdrbuff * shm_rdrbuff_create() +struct shm_rdrbuff * shm_rdrbuff_create(void) { struct shm_rdrbuff * rdrb; mode_t mask; @@ -248,9 +236,6 @@ struct shm_rdrbuff * shm_rdrbuff_create() #ifndef __APPLE__ pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK); #endif - if (pthread_cond_init(rdrb->full, &cattr)) - goto fail_full; - if (pthread_cond_init(rdrb->healthy, &cattr)) goto fail_healthy; @@ -265,8 +250,6 @@ struct shm_rdrbuff * shm_rdrbuff_create() return rdrb; fail_healthy: - pthread_cond_destroy(rdrb->full); - fail_full: pthread_condattr_destroy(&cattr); fail_cattr: pthread_mutex_destroy(rdrb->lock); @@ -275,59 +258,14 @@ struct shm_rdrbuff * shm_rdrbuff_create() fail_mattr: shm_rdrbuff_destroy(rdrb); fail_rdrb: - return NULL; + return NULL; } -struct shm_rdrbuff * shm_rdrbuff_open() +struct shm_rdrbuff * shm_rdrbuff_open(void) { return rdrb_create(O_RDWR); } -int shm_rdrbuff_wait_full(struct shm_rdrbuff * rdrb, - struct timespec * timeo) -{ - struct timespec abstime; - - if (timeo != NULL) { - clock_gettime(PTHREAD_COND_CLOCK, &abstime); - ts_add(&abstime, timeo, &abstime); - } - -#ifndef HAVE_ROBUST_MUTEX - pthread_mutex_lock(rdrb->lock); -#else - if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); -#endif - - while (shm_rdrb_free(rdrb, WAIT_BLOCKS)) { -#ifndef HAVE_ROBUST_MUTEX - if (pthread_cond_timedwait(rdrb->full, - rdrb->lock, - &abstime) == ETIMEDOUT) { - pthread_mutex_unlock(rdrb->lock); - return -ETIMEDOUT; - } -#else - int ret = pthread_cond_timedwait(rdrb->full, - rdrb->lock, - &abstime); - if (ret == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); - if (ret == ETIMEDOUT) { - pthread_mutex_unlock(rdrb->lock); - return -ETIMEDOUT; - } -#endif - } - - garbage_collect(rdrb); - - pthread_mutex_unlock(rdrb->lock); - - return 0; -} - void shm_rdrbuff_purge(void) { char * shm_rdrb_fn; @@ -340,14 +278,13 @@ void shm_rdrbuff_purge(void) free(shm_rdrb_fn); } -ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, - size_t headspace, - size_t tailspace, - const uint8_t * data, - size_t len) +ssize_t shm_rdrbuff_alloc(struct shm_rdrbuff * rdrb, + size_t len, + uint8_t ** ptr, + struct shm_du_buff ** psdb) { struct shm_du_buff * sdb; - size_t size = headspace + len + tailspace; + size_t size = DU_BUFF_OVERHEAD + len; #ifdef SHM_RDRB_MULTI_BLOCK size_t blocks = 0; size_t padblocks = 0; @@ -355,23 +292,24 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, ssize_t sz = size + sizeof(*sdb); assert(rdrb); + assert(psdb); #ifndef SHM_RDRB_MULTI_BLOCK if (sz > SHM_RDRB_BLOCK_SIZE) return -EMSGSIZE; +#else + while (sz > 0) { + sz -= SHM_RDRB_BLOCK_SIZE; + ++blocks; + } #endif #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); + sanitize(rdrb); #endif #ifdef SHM_RDRB_MULTI_BLOCK - while (sz > 0) { - sz -= SHM_RDRB_BLOCK_SIZE; - ++blocks; - } - if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; @@ -379,7 +317,6 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, #else if (!shm_rdrb_free(rdrb, 1)) { #endif - pthread_cond_broadcast(rdrb->full); pthread_mutex_unlock(rdrb->lock); return -EAGAIN; } @@ -389,7 +326,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->flags = SDB_NULL; + sdb->refs = 0; sdb->du_head = 0; sdb->du_tail = 0; sdb->idx = *rdrb->head; @@ -398,7 +335,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, } #endif sdb = get_head_ptr(rdrb); - sdb->flags = SDB_VALID; + sdb->refs = 1; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; @@ -410,66 +347,64 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb, pthread_mutex_unlock(rdrb->lock); sdb->size = size; - sdb->du_head = headspace; + sdb->du_head = DU_BUFF_HEADSPACE; sdb->du_tail = sdb->du_head + len; - if (data != NULL) - memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); + *psdb = sdb; + if (ptr != NULL) + *ptr = (uint8_t *) (sdb + 1) + sdb->du_head; return sdb->idx; } -ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, - size_t headspace, - size_t tailspace, - const uint8_t * data, +ssize_t shm_rdrbuff_alloc_b(struct shm_rdrbuff * rdrb, size_t len, + uint8_t ** ptr, + struct shm_du_buff ** psdb, const struct timespec * abstime) { struct shm_du_buff * sdb; - size_t size = headspace + len + tailspace; + size_t size = DU_BUFF_OVERHEAD + len; #ifdef SHM_RDRB_MULTI_BLOCK - size_t blocks = 0; + size_t blocks = 0; size_t padblocks = 0; #endif - ssize_t sz = size + sizeof(*sdb); - int ret = 0; + ssize_t sz = size + sizeof(*sdb); + int ret = 0; assert(rdrb); + assert(psdb); #ifndef SHM_RDRB_MULTI_BLOCK if (sz > SHM_RDRB_BLOCK_SIZE) return -EMSGSIZE; +#else + while (sz > 0) { + sz -= SHM_RDRB_BLOCK_SIZE; + ++blocks; + } #endif #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); + sanitize(rdrb); #endif - pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock, - (void *) rdrb->lock); + pthread_cleanup_push(__cleanup_mutex_unlock, rdrb->lock); #ifdef SHM_RDRB_MULTI_BLOCK - while (sz > 0) { - sz -= SHM_RDRB_BLOCK_SIZE; - ++blocks; - } - if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; - while (!shm_rdrb_free(rdrb, (blocks + padblocks)) && ret != ETIMEDOUT) { + while (!shm_rdrb_free(rdrb, blocks + padblocks) && ret != ETIMEDOUT) { #else while (!shm_rdrb_free(rdrb, 1) && ret != ETIMEDOUT) { #endif - pthread_cond_broadcast(rdrb->full); - if (abstime != NULL) - ret = pthread_cond_timedwait(rdrb->healthy, - rdrb->lock, - abstime); - else - ret = pthread_cond_wait(rdrb->healthy, rdrb->lock); + ret = __timedwait(rdrb->healthy, rdrb->lock, abstime); +#ifdef SHM_RDRB_MULTI_BLOCK + if (blocks + *rdrb->head > (SHM_BUFFER_SIZE)) + padblocks = (SHM_BUFFER_SIZE) - *rdrb->head; +#endif } if (ret != ETIMEDOUT) { @@ -478,7 +413,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, sdb = get_head_ptr(rdrb); sdb->size = 0; sdb->blocks = padblocks; - sdb->flags = SDB_NULL; + sdb->refs = 0; sdb->du_head = 0; sdb->du_tail = 0; sdb->idx = *rdrb->head; @@ -487,7 +422,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, } #endif sdb = get_head_ptr(rdrb); - sdb->flags = SDB_VALID; + sdb->refs = 1; sdb->idx = *rdrb->head; #ifdef SHM_RDRB_MULTI_BLOCK sdb->blocks = blocks; @@ -504,11 +439,12 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff * rdrb, return -ETIMEDOUT; sdb->size = size; - sdb->du_head = headspace; + sdb->du_head = DU_BUFF_HEADSPACE; sdb->du_tail = sdb->du_head + len; - if (data != NULL) - memcpy(((uint8_t *) (sdb + 1)) + headspace, data, len); + *psdb = sdb; + if (ptr != NULL) + *ptr = (uint8_t *) (sdb + 1) + sdb->du_head; return sdb->idx; } @@ -541,6 +477,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * rdrb, int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, size_t idx) { + struct shm_du_buff * sdb; + assert(rdrb); assert(idx < (SHM_BUFFER_SIZE)); @@ -548,22 +486,18 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb, pthread_mutex_lock(rdrb->lock); #else if (pthread_mutex_lock(rdrb->lock) == EOWNERDEAD) - pthread_mutex_consistent(rdrb->lock); + sanitize(rdrb); #endif - if (shm_rdrb_empty(rdrb)) { - pthread_mutex_unlock(rdrb->lock); - return -1; - } + /* assert(!shm_rdrb_empty(rdrb)); */ - idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL; + sdb = idx_to_du_buff_ptr(rdrb, idx); - if (idx != *rdrb->tail) { - pthread_mutex_unlock(rdrb->lock); - return 0; + if (sdb->refs == 1) { /* only stack needs it, can be removed */ + sdb->refs = 0; + if (idx == *rdrb->tail) + garbage_collect(rdrb); } - garbage_collect(rdrb); - pthread_mutex_unlock(rdrb->lock); return 0; @@ -590,6 +524,13 @@ uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb) return (uint8_t *) (sdb + 1) + sdb->du_tail; } +size_t shm_du_buff_len(struct shm_du_buff * sdb) +{ + assert(sdb); + + return sdb->du_tail - sdb->du_head; +} + uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb, size_t size) { @@ -636,7 +577,7 @@ uint8_t * shm_du_buff_head_release(struct shm_du_buff * sdb, } uint8_t * shm_du_buff_tail_release(struct shm_du_buff * sdb, - size_t size) + size_t size) { assert(sdb); assert(!(size > sdb->du_tail - sdb->du_head)); @@ -654,3 +595,16 @@ void shm_du_buff_truncate(struct shm_du_buff * sdb, sdb->du_tail = sdb->du_head + len; } + +int shm_du_buff_wait_ack(struct shm_du_buff * sdb) +{ + __sync_add_and_fetch(&sdb->refs, 1); + + return 0; +} + +int shm_du_buff_ack(struct shm_du_buff * sdb) +{ + __sync_sub_and_fetch(&sdb->refs, 1); + return 0; +} |
