summaryrefslogtreecommitdiff
path: root/src/lib/shm_ap_rbuff.c
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-09-04 18:11:53 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-09-06 09:12:27 +0200
commit116cda0ae03bc4e7b8571cf1658775c13c03c68e (patch)
treed15cb04d68a063fc3418d0259c9e779514861fcf /src/lib/shm_ap_rbuff.c
parentd35685c537e7809d5c4a213fcfa553d8a522bc51 (diff)
downloadouroboros-116cda0ae03bc4e7b8571cf1658775c13c03c68e.tar.gz
ouroboros-116cda0ae03bc4e7b8571cf1658775c13c03c68e.zip
lib: dev: Provide a set of fds to flow_select
The flow_select call now takes as a parameter a flow_set_t, which specifies a set of flow descriptors that will unblock the select call when an SDU for one of them arrives. The select call has been moved to its own header.
Diffstat (limited to 'src/lib/shm_ap_rbuff.c')
-rw-r--r--src/lib/shm_ap_rbuff.c88
1 files changed, 82 insertions, 6 deletions
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 6cc9590e..e6665362 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -348,15 +348,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)
return -1;
}
- ret = (rb->shm_base + *rb->ptr_tail)->index;
+ ret = tail_el_ptr(rb)->index;
pthread_mutex_unlock(rb->lock);
return ret;
}
-int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
- const struct timespec * timeout)
+static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb,
+ const struct timespec * timeout)
{
struct timespec abstime;
int ret = 0;
@@ -397,7 +397,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
}
if (ret != ETIMEDOUT)
- ret = (rb->shm_base + *rb->ptr_tail)->port_id;
+ ret = tail_el_ptr(rb)->port_id;
else
ret = -ETIMEDOUT;
@@ -406,6 +406,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
return ret;
}
+int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
+ bool * set,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret;
+
+ if (set == NULL)
+ return shm_ap_rbuff_peek_b_all(rb, timeout);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id])
+ && (ret != ETIMEDOUT)) {
+ while (shm_rbuff_empty(rb)) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->add,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->add, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+
+ while (!set[tail_el_ptr(rb)->port_id]) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->del,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->del, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+ }
+
+ if (ret != ETIMEDOUT)
+ ret = tail_el_ptr(rb)->port_id;
+ else
+ ret = -ETIMEDOUT;
+
+ pthread_cleanup_pop(true);
+
+ return ret;
+}
+
+
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
{
struct rb_entry * e = NULL;
@@ -471,8 +547,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
return idx;
}
-ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
- int port_id,
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
const struct timespec * timeout)
{
struct timespec abstime;