diff options
Diffstat (limited to 'src/lib/shm_rbuff_pthr.c')
-rw-r--r-- | src/lib/shm_rbuff_pthr.c | 62 |
1 files changed, 59 insertions, 3 deletions
diff --git a/src/lib/shm_rbuff_pthr.c b/src/lib/shm_rbuff_pthr.c index b4134bf6..5a58605b 100644 --- a/src/lib/shm_rbuff_pthr.c +++ b/src/lib/shm_rbuff_pthr.c @@ -72,7 +72,7 @@ int shm_rbuff_write(struct shm_rbuff * rb, pthread_cond_broadcast(rb->add); *head_el_ptr(rb) = (ssize_t) idx; - *rb->head = (*rb->head + 1) & ((SHM_BUFFER_SIZE) -1); + *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); pthread_mutex_unlock(rb->lock); @@ -82,6 +82,62 @@ int shm_rbuff_write(struct shm_rbuff * rb, return ret; } +int shm_rbuff_write_b(struct shm_rbuff * rb, + size_t idx, + const struct timespec * abstime) +{ + int ret = 0; + + assert(rb); + assert(idx < SHM_BUFFER_SIZE); + +#ifndef HAVE_ROBUST_MUTEX + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + + if (*rb->acl != ACL_RDWR) { + if (*rb->acl & ACL_FLOWDOWN) + ret = -EFLOWDOWN; + else if (*rb->acl & ACL_RDONLY) + ret = -ENOTALLOC; + goto err; + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while (!shm_rbuff_free(rb) && ret != -ETIMEDOUT) { + if (abstime != NULL) + ret = -pthread_cond_timedwait(rb->add, + rb->lock, + abstime); + else + ret = -pthread_cond_wait(rb->add, rb->lock); +#ifdef HAVE_ROBUST_MUTEX + if (ret == -EOWNERDEAD) + pthread_mutex_consistent(rb->lock); +#endif + } + + if (shm_rbuff_empty(rb)) + pthread_cond_broadcast(rb->add); + + if (ret != -ETIMEDOUT) { + *head_el_ptr(rb) = (ssize_t) idx; + *rb->head = (*rb->head + 1) & ((SHM_RBUFF_SIZE) - 1); + } + + pthread_cleanup_pop(true); + + return ret; + err: + pthread_mutex_unlock(rb->lock); + return ret; +} + ssize_t shm_rbuff_read(struct shm_rbuff * rb) { ssize_t ret = 0; @@ -102,7 +158,7 @@ ssize_t shm_rbuff_read(struct shm_rbuff * rb) } ret = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); + *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); pthread_cond_broadcast(rb->del); pthread_mutex_unlock(rb->lock); @@ -147,7 +203,7 @@ ssize_t shm_rbuff_read_b(struct shm_rbuff * rb, if (idx != -ETIMEDOUT) { idx = *tail_el_ptr(rb); - *rb->tail = (*rb->tail + 1) & ((SHM_BUFFER_SIZE) - 1); + *rb->tail = (*rb->tail + 1) & ((SHM_RBUFF_SIZE) - 1); pthread_cond_broadcast(rb->del); } |