summaryrefslogtreecommitdiff
path: root/src/lib/shm_rbuff_pthr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_rbuff_pthr.c')
-rw-r--r--src/lib/shm_rbuff_pthr.c52
1 files changed, 36 insertions, 16 deletions
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c
index 9567762f..fb183d8f 100644
--- a/src/lib/shm_rbuff_pthr.c
+++ b/src/lib/shm_rbuff_pthr.c
@@ -26,6 +26,7 @@
#include <ouroboros/lockfile.h>
#include <ouroboros/time_utils.h>
#include <ouroboros/errno.h>
+#include <ouroboros/fccntl.h>
#include <pthread.h>
#include <sys/mman.h>
@@ -41,8 +42,6 @@
#include <stdbool.h>
#define FN_MAX_CHARS 255
-#define RB_OPEN 0
-#define RB_CLOSED 1
#define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \
+ 3 * sizeof(size_t) \
@@ -138,7 +137,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id)
pthread_cond_init(rb->add, &cattr);
pthread_cond_init(rb->del, &cattr);
- *rb->acl = RB_OPEN;
+ *rb->acl = ACL_RDWR;
*rb->head = 0;
*rb->tail = 0;
@@ -226,8 +225,11 @@ void shm_rbuff_destroy(struct shm_rbuff * rb)
free(rb);
}
-int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
+int shm_rbuff_write(struct shm_rbuff * rb,
+ size_t idx)
{
+ int ret = 0;
+
assert(rb);
assert(idx < SHM_BUFFER_SIZE);
@@ -237,14 +239,18 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- if (*rb->acl == RB_CLOSED) {
- pthread_mutex_unlock(rb->lock);
- return -ENOTALLOC;
+
+ if (*rb->acl != ACL_RDWR) {
+ if (*rb->acl & ACL_FLOWDOWN)
+ ret = -EFLOWDOWN;
+ else if (*rb->acl & ACL_RDONLY)
+ ret = -ENOTALLOC;
+ goto err;
}
if (!shm_rbuff_free(rb)) {
- pthread_mutex_unlock(rb->lock);
- return -EAGAIN;
+ ret = -EAGAIN;
+ goto err;
}
if (shm_rbuff_empty(rb))
@@ -256,6 +262,9 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx)
pthread_mutex_unlock(rb->lock);
return 0;
+ err:
+ pthread_mutex_unlock(rb->lock);
+ return ret;
}
ssize_t shm_rbuff_read(struct shm_rbuff * rb)
@@ -270,9 +279,11 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
+
if (shm_rbuff_empty(rb)) {
+ ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN;
pthread_mutex_unlock(rb->lock);
- return -EAGAIN;
+ return ret;
}
ret = *tail_el_ptr(rb);
@@ -297,6 +308,12 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
+
+ if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) {
+ pthread_mutex_unlock(rb->lock);
+ return -EFLOWDOWN;
+ }
+
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);
@@ -324,7 +341,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb,
return idx;
}
-void shm_rbuff_block(struct shm_rbuff * rb)
+void shm_rbuff_set_acl(struct shm_rbuff * rb,
+ uint32_t flags)
{
assert(rb);
@@ -334,13 +352,15 @@ void shm_rbuff_block(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- *rb->acl = RB_CLOSED;
+ *rb->acl = (size_t) flags;
pthread_mutex_unlock(rb->lock);
}
-void shm_rbuff_unblock(struct shm_rbuff * rb)
+uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb)
{
+ uint32_t flags;
+
assert(rb);
#ifndef HAVE_ROBUST_MUTEX
@@ -349,9 +369,11 @@ void shm_rbuff_unblock(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- *rb->acl = RB_OPEN;
+ flags = (uint32_t) *rb->acl;
pthread_mutex_unlock(rb->lock);
+
+ return flags;
}
void shm_rbuff_fini(struct shm_rbuff * rb)
@@ -364,8 +386,6 @@ void shm_rbuff_fini(struct shm_rbuff * rb)
if (pthread_mutex_lock(rb->lock) == EOWNERDEAD)
pthread_mutex_consistent(rb->lock);
#endif
- assert(*rb->acl == RB_CLOSED);
-
pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
(void *) rb->lock);