diff options
Diffstat (limited to 'src/lib/shm_rbuff_pthr.c')
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 52 |
1 files changed, 36 insertions, 16 deletions
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index 9567762f..fb183d8f 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -26,6 +26,7 @@ #include <ouroboros/lockfile.h> #include <ouroboros/time_utils.h> #include <ouroboros/errno.h> +#include <ouroboros/fccntl.h> #include <pthread.h> #include <sys/mman.h> @@ -41,8 +42,6 @@ #include <stdbool.h> #define FN_MAX_CHARS 255 -#define RB_OPEN 0 -#define RB_CLOSED 1 #define SHM_RBUFF_FILE_SIZE ((SHM_BUFFER_SIZE) * sizeof(ssize_t) \ + 3 * sizeof(size_t) \ @@ -138,7 +137,7 @@ struct shm_rbuff * shm_rbuff_create(pid_t api, int port_id) pthread_cond_init(rb->add, &cattr); pthread_cond_init(rb->del, &cattr); - *rb->acl = RB_OPEN; + *rb->acl = ACL_RDWR; *rb->head = 0; *rb->tail = 0; @@ -226,8 +225,11 @@ void shm_rbuff_destroy(struct shm_rbuff * rb) free(rb); } -int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) +int shm_rbuff_write(struct shm_rbuff * rb, + size_t idx) { + int ret = 0; + assert(rb); assert(idx < SHM_BUFFER_SIZE); @@ -237,14 +239,18 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - if (*rb->acl == RB_CLOSED) { - pthread_mutex_unlock(rb->lock); - return -ENOTALLOC; + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; } if (!shm_rbuff_free(rb)) { - pthread_mutex_unlock(rb->lock); - return -EAGAIN; + ret = -EAGAIN; + goto err; } if (shm_rbuff_empty(rb)) @@ -256,6 +262,9 @@ int shm_rbuff_write(struct shm_rbuff * rb, size_t idx) pthread_mutex_unlock(rb->lock); return 0; + err: + pthread_mutex_unlock(rb->lock); + return ret; } ssize_t shm_rbuff_read(struct shm_rbuff * rb) @@ -270,9 +279,11 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif + if (shm_rbuff_empty(rb)) { + ret = *rb->acl & ACL_FLOWDOWN ? -EFLOWDOWN : -EAGAIN; pthread_mutex_unlock(rb->lock); - return -EAGAIN; + return ret; } ret = *tail_el_ptr(rb); @@ -297,6 +308,12 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif + + if (shm_rbuff_empty(rb) && (*rb->acl & ACL_FLOWDOWN)) { + pthread_mutex_unlock(rb->lock); + return -EFLOWDOWN; + } + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); @@ -324,7 +341,8 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, return idx; } -void shm_rbuff_block(struct shm_rbuff * rb) +void shm_rbuff_set_acl(struct shm_rbuff * rb, + uint32_t flags) { assert(rb); @@ -334,13 +352,15 @@ void shm_rbuff_block(struct shm_rbuff * rb) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - *rb->acl = RB_CLOSED; + *rb->acl = (size_t) flags; pthread_mutex_unlock(rb->lock); } -void shm_rbuff_unblock(struct shm_rbuff * rb) +uint32_t shm_rbuff_get_acl(struct shm_rbuff * rb) { + uint32_t flags; + assert(rb); #ifndef HAVE_ROBUST_MUTEX @@ -349,9 +369,11 @@ void shm_rbuff_unblock(struct shm_rbuff * rb) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - *rb->acl = RB_OPEN; + flags = (uint32_t) *rb->acl; pthread_mutex_unlock(rb->lock); + + return flags; } void shm_rbuff_fini(struct shm_rbuff * rb) @@ -364,8 +386,6 @@ void shm_rbuff_fini(struct shm_rbuff * rb) if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) pthread_mutex_consistent(rb->lock); #endif - assert(*rb->acl == RB_CLOSED); - pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, (void *) rb->lock); |