summaryrefslogtreecommitdiff
path: root/src/lib/shm_rbuff_ll.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_rbuff_ll.c')
-rw-r--r--src/lib/shm_rbuff_ll.c30
1 files changed, 16 insertions, 14 deletions
diff --git a/src/lib/shm_rbuff_ll.c b/src/lib/shm_rbuff_ll.c
index ec0199c0..b77c4374 100644
--- a/src/lib/shm_rbuff_ll.c
+++ b/src/lib/shm_rbuff_ll.c
@@ -26,6 +26,7 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +42,6 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_OPEN 0
-#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 3 * sizeof(size_t) \
@@ -141,7 +140,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = RB_OPEN;
+ *rb->acl = ACL_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -227,14 +226,17 @@ int shm_rbuff_write(struct shm_rbuff * rb,
{
size_t ohead;
size_t nhead;
-
- bool was_empty = false;
+ bool was_empty = false;
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
- if (__sync_fetch_and_add(rb->acl, 0)) /* CLOSED */
- return -ENOTALLOC;
+ if (__sync_fetch_and_add(rb->acl, 0) != ACL_RDWR) {
+ if (__sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN)
+ return -EFLOWDOWN;
+ else if (__sync_fetch_and_add(rb->acl, 0) & ACL_RDONLY)
+ return -ENOTALLOC;
+ }
if (!shm_rbuff_free(rb))
return -EAGAIN;
@@ -266,7 +268,8 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
assert(rb);
if (shm_rbuff_empty(rb))
- return -EAGAIN;
+ return __sync_fetch_and_add(rb->acl, 0) & ACL_FLOWDOWN ?
+ -EFLOWDOWN : -EAGAIN;
ntail = RB_TAIL;
@@ -327,26 +330,25 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-void shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags)
{
assert(rb);
- __sync_bool_compare_and_swap(rb->acl, RB_OPEN, RB_CLOSED);
+ __sync_bool_compare_and_swap(rb->acl, *rb->acl, flags);
}
-void shm_rbuff_unblock(struct shm_rbuff * rb)
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)
{
assert(rb);
- __sync_bool_compare_and_swap(rb->acl, RB_CLOSED, RB_OPEN);
+ return __sync_fetch_and_add(rb->acl, 0);
}
void shm_rbuff_fini(struct shm_rbuff * rb)
{
assert(rb);
- assert(__sync_fetch_and_add(rb->acl, 0) == RB_CLOSED);
-
if (shm_rbuff_empty(rb))
return;