diff options
| -rw-r--r-- | include/ouroboros/shm_rbuff.h | 1 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_ll.c | 12 | ||||
| -rw-r--r-- | src/lib/shm_rbuff_pthr.c | 35 | 
3 files changed, 33 insertions, 15 deletions
diff --git a/include/ouroboros/shm_rbuff.h b/include/ouroboros/shm_rbuff.h index e853e487..6e77cd07 100644 --- a/include/ouroboros/shm_rbuff.h +++ b/include/ouroboros/shm_rbuff.h @@ -31,6 +31,7 @@  #define ACL_RDWR     0000  #define ACL_RDONLY   0001  #define ACL_FLOWDOWN 0002 +#define ACL_FLOWPEER 0004  struct shm_rbuff; diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c index 880d81dc..2eed9b74 100644 --- a/src/lib/shm_rbuff_ll.c +++ b/src/lib/shm_rbuff_ll.c @@ -143,9 +143,15 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)          assert(rb); -        if (shm_rbuff_empty(rb)) -                return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ? -                        -EFLOWDOWN : -EAGAIN; +        if (shm_rbuff_empty(rb)) { +                if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN) +                        return -EFLOWDOWN; + +                if (_sync_fetch_and_add(rb->acl, 0) & ACL_FLOWPEER) +                        return -EFLOWPEER; + +                return -EAGAIN; +        }          ntail = RB_TAIL; diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index e41d31f8..657b55c4 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -141,6 +141,17 @@ int shm_rbuff_write_b(struct shm_rbuff *      rb,          return ret;  } +static int check_rb_acl(struct shm_rbuff * rb) +{ +        if (*rb->acl & ACL_FLOWDOWN) +                return -EFLOWDOWN; + +        if (*rb->acl & ACL_FLOWPEER) +                return -EFLOWPEER; + +        return -EAGAIN; +} +  ssize_t shm_rbuff_read(struct shm_rbuff * rb)  {          ssize_t ret = 0; @@ -155,7 +166,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)  #endif          if (shm_rbuff_empty(rb)) { -                ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN; +                ret = check_rb_acl(rb);                  pthread_mutex_unlock(rb->lock);                  return ret;          } @@ -190,9 +201,9 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,          pthread_cleanup_push(__cleanup_mutex_unlock, rb->lock); -        while (shm_rbuff_empty(rb) -               && (idx != -ETIMEDOUT) -               && !(*rb->acl & ACL_FLOWDOWN)) { +        while (shm_rbuff_empty(rb) && +               idx != -ETIMEDOUT && +               check_rb_acl(rb) == -EAGAIN) {                  if (abstime != NULL)                          idx = -pthread_cond_timedwait(rb->add,                                                        rb->lock, @@ -205,18 +216,18 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff *      rb,  #endif          } -        if (idx != -ETIMEDOUT) { -                if (*rb->acl & ACL_FLOWDOWN) -                        idx = -EFLOWDOWN; -                else { -                        idx = *tail_el_ptr(rb); -                        *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); -                        pthread_cond_broadcast(rb->del); -                } +        if (!shm_rbuff_empty(rb)) { +                idx = *tail_el_ptr(rb); +                *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); +                pthread_cond_broadcast(rb->del); +        } else if (idx != -ETIMEDOUT) { +                idx = check_rb_acl(rb);          }          pthread_cleanup_pop(true); +        assert(idx != -EAGAIN); +          return idx;  }  | 
