From 116cda0ae03bc4e7b8571cf1658775c13c03c68e Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 4 Sep 2016 18:11:53 +0200 Subject: lib: dev: Provide a set of fds to flow_select The flow_select call now takes as a parameter a flow_set_t, which specifies a set of flow descriptors that will unblock the select call when an SDU for one of them arrives. The select call has been moved to its own header. --- src/lib/shm_ap_rbuff.c | 88 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 82 insertions(+), 6 deletions(-) (limited to 'src/lib/shm_ap_rbuff.c') diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6cc9590e..e6665362 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -348,15 +348,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb) return -1; } - ret = (rb->shm_base + *rb->ptr_tail)->index; + ret = tail_el_ptr(rb)->index; pthread_mutex_unlock(rb->lock); return ret; } -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - const struct timespec * timeout) +static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, + const struct timespec * timeout) { struct timespec abstime; int ret = 0; @@ -397,7 +397,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, } if (ret != ETIMEDOUT) - ret = (rb->shm_base + *rb->ptr_tail)->port_id; + ret = tail_el_ptr(rb)->port_id; else ret = -ETIMEDOUT; @@ -406,6 +406,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, return ret; } +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, + bool * set, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret; + + if (set == NULL) + return shm_ap_rbuff_peek_b_all(rb, timeout); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) + && (ret != ETIMEDOUT)) { + while (shm_rbuff_empty(rb)) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->add, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->add, rb->lock); + +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) + break; + } + + while (!set[tail_el_ptr(rb)->port_id]) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->del, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->del, rb->lock); + +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) + break; + } + } + + if (ret != ETIMEDOUT) + ret = tail_el_ptr(rb)->port_id; + else + ret = -ETIMEDOUT; + + pthread_cleanup_pop(true); + + return ret; +} + + struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) { struct rb_entry * e = NULL; @@ -471,8 +547,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) return idx; } -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, + int port_id, const struct timespec * timeout) { struct timespec abstime; -- cgit v1.2.3