summaryrefslogtreecommitdiff
path: root/src/lib
diff options
context:
space:
mode:
authordimitri staessens <dimitri.staessens@intec.ugent.be>2016-09-04 18:11:53 +0200
committerdimitri staessens <dimitri.staessens@intec.ugent.be>2016-09-06 09:12:27 +0200
commit116cda0ae03bc4e7b8571cf1658775c13c03c68e (patch)
treed15cb04d68a063fc3418d0259c9e779514861fcf /src/lib
parentd35685c537e7809d5c4a213fcfa553d8a522bc51 (diff)
downloadouroboros-116cda0ae03bc4e7b8571cf1658775c13c03c68e.tar.gz
ouroboros-116cda0ae03bc4e7b8571cf1658775c13c03c68e.zip
lib: dev: Provide a set of fds to flow_select
The flow_select call now takes as a parameter a flow_set_t, which specifies a set of flow descriptors that will unblock the select call when an SDU for one of them arrives. The select call has been moved to its own header.
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/dev.c91
-rw-r--r--src/lib/shm_ap_rbuff.c88
2 files changed, 165 insertions, 14 deletions
diff --git a/src/lib/dev.c b/src/lib/dev.c
index b7de921d..64327dd3 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -28,9 +28,16 @@
#include <ouroboros/shm_rdrbuff.h>
#include <ouroboros/shm_ap_rbuff.h>
#include <ouroboros/utils.h>
+#include <ouroboros/select.h>
#include <stdlib.h>
#include <string.h>
+#include <stdio.h>
+
+struct flow_set {
+ bool b[IRMD_MAX_FLOWS];
+ pthread_rwlock_t lock;
+};
struct flow {
struct shm_ap_rbuff * rb;
@@ -577,14 +584,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)
return 0;
}
-int flow_select(const struct timespec * timeout)
-{
- int port_id = shm_ap_rbuff_peek_b(ai->rb, timeout);
- if (port_id < 0)
- return port_id;
- return ai->ports[port_id];
-}
-
ssize_t flow_read(int fd, void * buf, size_t count)
{
int idx = -1;
@@ -638,3 +637,79 @@ ssize_t flow_read(int fd, void * buf, size_t count)
return n;
}
+
+/* select functions */
+
+struct flow_set * flow_set_create()
+{
+ struct flow_set * set = malloc(sizeof(*set));
+ if (set == NULL)
+ return NULL;
+
+ if (pthread_rwlock_init(&set->lock, NULL)) {
+ free(set);
+ return NULL;
+ }
+
+ memset(&set->b, 0, sizeof(set->b));
+
+ return set;
+}
+
+void flow_set_zero(struct flow_set * set)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ memset(&set->b, 0, sizeof(set->b));
+ pthread_rwlock_unlock(&set->lock);
+}
+
+void flow_set_add(struct flow_set * set, int fd)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ set->b[ai->flows[fd].port_id] = true;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+void flow_set_del(struct flow_set * set, int fd)
+{
+ pthread_rwlock_wrlock(&set->lock);
+ set->b[ai->flows[fd].port_id] = false;
+ pthread_rwlock_unlock(&set->lock);
+}
+
+bool flow_set_has(struct flow_set * set, int fd)
+{
+ bool ret;
+ pthread_rwlock_rdlock(&set->lock);
+ ret = set->b[ai->flows[fd].port_id];
+ pthread_rwlock_unlock(&set->lock);
+ return ret;
+}
+
+void flow_set_destroy(struct flow_set * set)
+{
+ pthread_rwlock_destroy(&set->lock);
+ free(set);
+}
+
+static void flow_set_cpy(bool * dst, struct flow_set * src)
+{
+ pthread_rwlock_rdlock(&src->lock);
+ memcpy(dst, src->b, IRMD_MAX_FLOWS);
+ pthread_rwlock_unlock(&src->lock);
+}
+
+int flow_select(struct flow_set * set, const struct timespec * timeout)
+{
+ int port_id;
+ bool b[IRMD_MAX_FLOWS];
+ if (set == NULL) {
+ port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout);
+ } else {
+ flow_set_cpy(b, set);
+ port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) b, timeout);
+ }
+ if (port_id < 0)
+ return port_id;
+ return ai->ports[port_id];
+}
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 6cc9590e..e6665362 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -348,15 +348,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)
return -1;
}
- ret = (rb->shm_base + *rb->ptr_tail)->index;
+ ret = tail_el_ptr(rb)->index;
pthread_mutex_unlock(rb->lock);
return ret;
}
-int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
- const struct timespec * timeout)
+static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb,
+ const struct timespec * timeout)
{
struct timespec abstime;
int ret = 0;
@@ -397,7 +397,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
}
if (ret != ETIMEDOUT)
- ret = (rb->shm_base + *rb->ptr_tail)->port_id;
+ ret = tail_el_ptr(rb)->port_id;
else
ret = -ETIMEDOUT;
@@ -406,6 +406,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
return ret;
}
+int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,
+ bool * set,
+ const struct timespec * timeout)
+{
+ struct timespec abstime;
+ int ret;
+
+ if (set == NULL)
+ return shm_ap_rbuff_peek_b_all(rb, timeout);
+
+#ifdef __APPLE__
+ pthread_mutex_lock(rb->lock);
+#else
+ if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (timeout != NULL) {
+ clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+ ts_add(&abstime, timeout, &abstime);
+ }
+
+ pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+ (void *) rb->lock);
+
+ while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id])
+ && (ret != ETIMEDOUT)) {
+ while (shm_rbuff_empty(rb)) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->add,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->add, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+
+ while (!set[tail_el_ptr(rb)->port_id]) {
+ if (timeout != NULL)
+ ret = pthread_cond_timedwait(rb->del,
+ rb->lock,
+ &abstime);
+ else
+ ret = pthread_cond_wait(rb->del, rb->lock);
+
+#ifndef __APPLE__
+ if (ret == EOWNERDEAD) {
+ LOG_DBG("Recovering dead mutex.");
+ pthread_mutex_consistent(rb->lock);
+ }
+#endif
+ if (ret == ETIMEDOUT)
+ break;
+ }
+ }
+
+ if (ret != ETIMEDOUT)
+ ret = tail_el_ptr(rb)->port_id;
+ else
+ ret = -ETIMEDOUT;
+
+ pthread_cleanup_pop(true);
+
+ return ret;
+}
+
+
struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
{
struct rb_entry * e = NULL;
@@ -471,8 +547,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
return idx;
}
-ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
- int port_id,
+ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb,
+ int port_id,
const struct timespec * timeout)
{
struct timespec abstime;