diff options
Diffstat (limited to 'src/lib/shm_rbuff_pthr.c')
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 66 |
1 files changed, 38 insertions, 28 deletions
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index cedbc7b1..b543fb07 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -1,5 +1,5 @@ /* - * Ouroboros - Copyright (C) 2016 - 2021 + * Ouroboros - Copyright (C) 2016 - 2024 * * Ring buffer for incoming packets * @@ -24,12 +24,15 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) { char fn[FN_MAX_CHARS]; - assert(rb); + assert(rb != NULL); #ifdef CONFIG_OUROBOROS_DEBUG pthread_mutex_lock(rb->lock); - assert(shm_rbuff_empty(rb)); + *rb->acl = *rb->acl & ACL_FLOWDOWN; + + pthread_cond_broadcast(rb->del); + pthread_cond_broadcast(rb->add); pthread_mutex_unlock(rb->lock); #endif @@ -45,7 +48,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, { int ret = 0; - assert(rb); + assert(rb != NULL); assert(idx < SHM_BUFFER_SIZE); #ifndef HAVE_ROBUST_MUTEX @@ -88,7 +91,7 @@ int shm_rbuff_write_b(struct shm_rbuff * rb, { int ret = 0; - assert(rb); + assert(rb != NULL); assert(idx < SHM_BUFFER_SIZE); #ifndef HAVE_ROBUST_MUTEX @@ -111,12 +114,7 @@ int shm_rbuff_write_b(struct shm_rbuff * rb, while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT && !(*rb->acl & ACL_FLOWDOWN)) { - if (abstime != NULL) - ret = -pthread_cond_timedwait(rb->del, - rb->lock, - abstime); - else - ret = -pthread_cond_wait(rb->del, rb->lock); + ret = -__timedwait(rb->del, rb->lock, abstime); #ifdef HAVE_ROBUST_MUTEX if (ret == -EOWNERDEAD) pthread_mutex_consistent(rb->lock); @@ -138,11 +136,24 @@ int shm_rbuff_write_b(struct shm_rbuff * rb, return ret; } +static int check_rb_acl(struct shm_rbuff * rb) +{ + assert(rb != NULL); + + if (*rb->acl & ACL_FLOWDOWN) + return -EFLOWDOWN; + + if (*rb->acl & ACL_FLOWPEER) + return -EFLOWPEER; + + return -EAGAIN; +} + ssize_t shm_rbuff_read(struct shm_rbuff * rb) { ssize_t ret = 0; - assert(rb); + assert(rb != NULL); #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); @@ -152,7 +163,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) #endif if (shm_rbuff_empty(rb)) { - ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN; + ret = check_rb_acl(rb); pthread_mutex_unlock(rb->lock); return ret; } @@ -171,7 +182,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, { ssize_t idx = -1; - assert(rb); + assert(rb != NULL); #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); @@ -187,36 +198,35 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); - while (shm_rbuff_empty(rb) - && (idx != -ETIMEDOUT) - && !(*rb->acl & ACL_FLOWDOWN)) { - if (abstime != NULL) - idx = -pthread_cond_timedwait(rb->add, - rb->lock, - abstime); - else - idx = -pthread_cond_wait(rb->add, rb->lock); + while (shm_rbuff_empty(rb) && + idx != -ETIMEDOUT && + check_rb_acl(rb) == -EAGAIN) { + idx = -__timedwait(rb->add, rb->lock, abstime); #ifdef HAVE_ROBUST_MUTEX if (idx == -EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif } - if (idx != -ETIMEDOUT) { + if (!shm_rbuff_empty(rb)) { idx = *tail_el_ptr(rb); *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); pthread_cond_broadcast(rb->del); + } else if (idx != -ETIMEDOUT) { + idx = check_rb_acl(rb); } pthread_cleanup_pop(true); + assert(idx != -EAGAIN); + return idx; } void shm_rbuff_set_acl(struct shm_rbuff * rb, uint32_t flags) { - assert(rb); + assert(rb != NULL); #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); @@ -236,7 +246,7 @@ uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) { uint32_t flags; - assert(rb); + assert(rb != NULL); #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); @@ -253,7 +263,7 @@ uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) void shm_rbuff_fini(struct shm_rbuff * rb) { - assert(rb); + assert(rb != NULL); #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); @@ -277,7 +287,7 @@ size_t shm_rbuff_queued(struct shm_rbuff * rb) { size_t ret; - assert(rb); + assert(rb != NULL); #ifndef HAVE_ROBUST_MUTEX pthread_mutex_lock(rb->lock); |