summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2022-03-26 20:28:56 +0100
committerSander Vrijders <sander@ouroboros.rocks>2022-03-30 15:05:05 +0200
commit56654f2cd1813d87d32695f126939bbfaad52385 (patch)
tree548a48888869e553e95e6e1ce37dc0cfaed0ab56
parent643c285c20abab5dadaa5c1929d978b725911b5d (diff)
downloadouroboros-56654f2cd1813d87d32695f126939bbfaad52385.tar.gz
ouroboros-56654f2cd1813d87d32695f126939bbfaad52385.zip
lib: Add rbuff support for FLOWPEER state
This allows setting the FLOWPEER state on a flow to signal a peer is unresponsive. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--include/ouroboros/shm_rbuff.h1
-rw-r--r--src/lib/shm_rbuff_ll.c12
-rw-r--r--src/lib/shm_rbuff_pthr.c35
3 files changed, 33 insertions, 15 deletions
diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h
index e853e487..6e77cd07 100644
--- a/include/ouroboros/shm_rbuff.h
+++ b/include/ouroboros/shm_rbuff.h
@@ -31,6 +31,7 @@
#define ACL_RDWR 0000
#define ACL_RDONLY 0001
#define ACL_FLOWDOWN 0002
+#define ACL_FLOWPEER 0004
struct shm_rbuff;
diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c
index 880d81dc..2eed9b74 100644
--- a/src/lib/shm_rbuff_ll.c
+++ b/src/lib/shm_rbuff_ll.c
@@ -143,9 +143,15 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
assert(rb);
- if (shm_rbuff_empty(rb))
- return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ?
- -EFLOWDOWN : -EAGAIN;
+ if (shm_rbuff_empty(rb)) {
+ if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN)
+ return -EFLOWDOWN;
+
+ if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWPEER)
+ return -EFLOWPEER;
+
+ return -EAGAIN;
+ }
ntail = RB_TAIL;
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c
index e41d31f8..657b55c4 100644
--- a/src/lib/shm_rbuff_pthr.c
+++ b/src/lib/shm_rbuff_pthr.c
@@ -141,6 +141,17 @@ int shm_rbuff_write_b(struct shm_rbuff * rb,
return ret;
}
+static int check_rb_acl(struct shm_rbuff * rb)
+{
+ if (*rb->acl & ACL_FLOWDOWN)
+ return -EFLOWDOWN;
+
+ if (*rb->acl & ACL_FLOWPEER)
+ return -EFLOWPEER;
+
+ return -EAGAIN;
+}
+
ssize_t shm_rbuff_read(struct shm_rbuff * rb)
{
ssize_t ret = 0;
@@ -155,7 +166,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
#endif
if (shm_rbuff_empty(rb)) {
- ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN;
+ ret = check_rb_acl(rb);
pthread_mutex_unlock(rb->lock);
return ret;
}
@@ -190,9 +201,9 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock);
- while (shm_rbuff_empty(rb)
- && (idx != -ETIMEDOUT)
- && !(*rb->acl & ACL_FLOWDOWN)) {
+ while (shm_rbuff_empty(rb) &&
+ idx != -ETIMEDOUT &&
+ check_rb_acl(rb) == -EAGAIN) {
if (abstime != NULL)
idx = -pthread_cond_timedwait(rb->add,
rb->lock,
@@ -205,18 +216,18 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
#endif
}
- if (idx != -ETIMEDOUT) {
- if (*rb->acl & ACL_FLOWDOWN)
- idx = -EFLOWDOWN;
- else {
- idx = *tail_el_ptr(rb);
- *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1);
- pthread_cond_broadcast(rb->del);
- }
+ if (!shm_rbuff_empty(rb)) {
+ idx = *tail_el_ptr(rb);
+ *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1);
+ pthread_cond_broadcast(rb->del);
+ } else if (idx != -ETIMEDOUT) {
+ idx = check_rb_acl(rb);
}
pthread_cleanup_pop(true);
+ assert(idx != -EAGAIN);
+
return idx;
}