diff options
Diffstat (limited to 'src/lib/shm_rbuff_ll.c')
| -rw-r--r-- | src/lib/shm_rbuff_ll.c | 96 |
1 files changed, 75 insertions, 21 deletions
diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index c488f274..46a5314e 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -1,10 +1,10 @@ /* - * Ouroboros - Copyright (C) 2016 - 2018 + * Ouroboros - Copyright (C) 2016 - 2024 * - * Lockless ring buffer for incoming SDUs + * Lockless ring buffer for incoming packets * - * Dimitri Staessens <dimitri.staessens@ugent.be> - * Sander Vrijders <sander.vrijders@ugent.be> + * Dimitri Staessens <dimitri@ouroboros.rocks> + * Sander Vrijders <sander@ouroboros.rocks> * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public License @@ -29,7 +29,12 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) assert(rb); - sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->port_id); + sprintf(fn, SHM_RBUFF_PREFIX "%d.%d", rb->pid, rb->flow_id); + + __sync_bool_compare_and_swap(rb->acl, *rb->acl, ACL_FLOWDOWN); + + pthread_cond_broadcast(rb->del); + pthread_cond_broadcast(rb->add); shm_rbuff_close(rb); @@ -65,7 +70,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, do { ohead = nhead; - nhead = (ohead + 1) & ((SHM_BUFFER_SIZE) - 1); + nhead = (ohead + 1) & ((SHM_RBUFF_SIZE) - 1); nhead = __sync_val_compare_and_swap(rb->head, ohead, nhead); } while (nhead != ohead); @@ -75,6 +80,57 @@ int shm_rbuff_write(struct shm_rbuff * rb, return 0; } +/* FIXME: this is a copy of the pthr implementation */ +int shm_rbuff_write_b(struct shm_rbuff * rb, + size_t idx, + const struct timespec * abstime) +{ + int ret = 0; + + assert(rb); + assert(idx < SHM_BUFFER_SIZE); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; + } + + pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); + + while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { + ret = -__timedwait(rb->add, rb->lock, abstime); +#ifdef HAVE_ROBUST_MUTEX + if (ret == -EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + } + + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + + if (ret != -ETIMEDOUT) { + *head_el_ptr(rb) = (ssize_t) idx; + *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) -1); + } + + pthread_cleanup_pop(true); + + return ret; + err: + pthread_mutex_unlock(rb->lock); + return ret; +} + ssize_t shm_rbuff_read(struct shm_rbuff * rb) { size_t otail; @@ -82,15 +138,21 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) assert(rb); - if (shm_rbuff_empty(rb)) - return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ? - -EFLOWDOWN : -EAGAIN; + if (shm_rbuff_empty(rb)) { + if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN) + return -EFLOWDOWN; + + if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWPEER) + return -EFLOWPEER; + + return -EAGAIN; + } ntail = RB_TAIL; do { otail = ntail; - ntail = (otail + 1) & ((SHM_BUFFER_SIZE) - 1); + ntail = (otail + 1) & ((SHM_RBUFF_SIZE) - 1); ntail = __sync_val_compare_and_swap(rb->tail, otail, ntail); } while (ntail != otail); @@ -117,16 +179,10 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); + pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); while (shm_rbuff_empty(rb) && (idx != -ETIMEDOUT)) { - if (abstime != NULL) - idx = -pthread_cond_timedwait(rb->add, - rb->lock, - abstime); - else - idx = -pthread_cond_wait(rb->add, rb->lock); + idx = -__timedwait(rb->add, rb->lock, abstime); #ifdef HAVE_ROBUST_MUTEX if (idx == -EOWNERDEAD) pthread_mutex_consistent(rb->lock); @@ -136,7 +192,6 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (idx != -ETIMEDOUT) { /* do a nonblocking read */ idx = shm_rbuff_read(rb); - assert(idx >= 0); } @@ -174,8 +229,7 @@ void shm_rbuff_fini(struct shm_rbuff * rb) pthread_mutex_consistent(rb->lock); #endif - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, - (void *) rb->lock); + pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); while (!shm_rbuff_empty(rb)) #ifndef HAVE_ROBUST_MUTEX |
