summaryrefslogtreecommitdiff
path: root/src/lib/shm_ap_rbuff.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r--src/lib/shm_ap_rbuff.c88
1 files changed, 82 insertions, 6 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 6cc9590e..e6665362 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -348,15 +348,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)
return -1;
}
- ret = (rb->shm_base + *rb->ptr_tail)->index;
+ ret = tail_el_ptr(rb)->index;
pthread_mutex_unlock(rb->lock);
return ret;
}
-int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
- const struct timespec * timeout)
+static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb,
+ const struct timespec * timeout)
{
struct timespec abstime;
int ret = 0;
@@ -397,7 +397,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
}
if (ret != ETIMEDOUT)
- ret = (rb->shm_base + *rb->ptr_tail)->port_id;
+ ret = tail_el_ptr(rb)->port_id;
else
ret = -ETIMEDOUT;
@@ -406,6 +406,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
return ret;
}
+int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
+ bool * set,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret;
+
+ if (set == NULL)
+ return shm_ap_rbuff_peek_b_all(rb, timeout);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id])
+ && (ret != ETIMEDOUT)) {
+ while (shm_rbuff_empty(rb)) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->add,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->add, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+
+ while (!set[tail_el_ptr(rb)->port_id]) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->del,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->del, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+ }
+
+ if (ret != ETIMEDOUT)
+ ret = tail_el_ptr(rb)->port_id;
+ else
+ ret = -ETIMEDOUT;
+
+ pthread_cleanup_pop(true);
+
+ return ret;
+}
+
+
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
{
struct rb_entry * e = NULL;
@@ -471,8 +547,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
return idx;
}
-ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
- int port_id,
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
const struct timespec * timeout)
{
struct timespec abstime;