diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2022-03-26 20:28:56 +0100 |
---|---|---|
committer | Sander Vrijders <sander@ouroboros.rocks> | 2022-03-30 15:05:05 +0200 |
commit | 56654f2cd1813d87d32695f126939bbfaad52385 (patch) | |
tree | 548a48888869e553e95e6e1ce37dc0cfaed0ab56 | |
parent | 643c285c20abab5dadaa5c1929d978b725911b5d (diff) | |
download | ouroboros-56654f2cd1813d87d32695f126939bbfaad52385.tar.gz ouroboros-56654f2cd1813d87d32695f126939bbfaad52385.zip |
lib: Add rbuff support for FLOWPEER state
This allows setting the FLOWPEER state on a flow to signal a peer is
unresponsive.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r-- | include/ouroboros/shm_rbuff.h | 1 | ||||
-rw-r--r-- | src/lib/shm_rbuff_ll.c | 12 | ||||
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 35 |
3 files changed, 33 insertions, 15 deletions
diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index e853e487..6e77cd07 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -31,6 +31,7 @@ #define ACL_RDWR 0000 #define ACL_RDONLY 0001 #define ACL_FLOWDOWN 0002 +#define ACL_FLOWPEER 0004 struct shm_rbuff; diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 880d81dc..2eed9b74 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -143,9 +143,15 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) assert(rb); - if (shm_rbuff_empty(rb)) - return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ? - -EFLOWDOWN : -EAGAIN; + if (shm_rbuff_empty(rb)) { + if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN) + return -EFLOWDOWN; + + if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWPEER) + return -EFLOWPEER; + + return -EAGAIN; + } ntail = RB_TAIL; diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index e41d31f8..657b55c4 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -141,6 +141,17 @@ int shm_rbuff_write_b(struct shm_rbuff * rb, return ret; } +static int check_rb_acl(struct shm_rbuff * rb) +{ + if (*rb->acl & ACL_FLOWDOWN) + return -EFLOWDOWN; + + if (*rb->acl & ACL_FLOWPEER) + return -EFLOWPEER; + + return -EAGAIN; +} + ssize_t shm_rbuff_read(struct shm_rbuff * rb) { ssize_t ret = 0; @@ -155,7 +166,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) #endif if (shm_rbuff_empty(rb)) { - ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN; + ret = check_rb_acl(rb); pthread_mutex_unlock(rb->lock); return ret; } @@ -190,9 +201,9 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); - while (shm_rbuff_empty(rb) - && (idx != -ETIMEDOUT) - && !(*rb->acl & ACL_FLOWDOWN)) { + while (shm_rbuff_empty(rb) && + idx != -ETIMEDOUT && + check_rb_acl(rb) == -EAGAIN) { if (abstime != NULL) idx = -pthread_cond_timedwait(rb->add, rb->lock, @@ -205,18 +216,18 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, #endif } - if (idx != -ETIMEDOUT) { - if (*rb->acl & ACL_FLOWDOWN) - idx = -EFLOWDOWN; - else { - idx = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); - pthread_cond_broadcast(rb->del); - } + if (!shm_rbuff_empty(rb)) { + idx = *tail_el_ptr(rb); + *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); + pthread_cond_broadcast(rb->del); + } else if (idx != -ETIMEDOUT) { + idx = check_rb_acl(rb); } pthread_cleanup_pop(true); + assert(idx != -EAGAIN); + return idx; } |