From a6e8978fd9b5786607438689f8cd8b8efb8ef77e Mon Sep 17 00:00:00 2001
From: dimitri staessens <dimitri.staessens@intec.ugent.be>
Date: Wed, 3 Aug 2016 00:19:06 +0200
Subject: lib: dev: Add select-like call

This adds a flow_select() call that will sleep until an SDU can be
read on a flow. It returns the file descriptor for which an SDU is
ready. It takes as optional argument a timespec struct to specify a
timeout.
---
 include/ouroboros/dev.h          |   2 +
 include/ouroboros/shm_ap_rbuff.h |   3 +
 include/ouroboros/shm_du_map.h   |  13 ++-
 src/lib/dev.c                    |  13 ++-
 src/lib/shm_ap_rbuff.c           | 172 +++++++++++++++++++++++++--------------
 src/lib/shm_du_map.c             |   6 +-
 6 files changed, 136 insertions(+), 73 deletions(-)

diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h
index eb779953..91d5c7de 100644
--- a/include/ouroboros/dev.h
+++ b/include/ouroboros/dev.h
@@ -22,6 +22,7 @@
 
 #include <unistd.h>
 #include <stdint.h>
+#include <sys/time.h>
 
 #include <ouroboros/qos.h>
 #include <ouroboros/flow.h>
@@ -53,5 +54,6 @@ int     flow_dealloc(int fd);
 int     flow_cntl(int fd, int cmd, int oflags);
 ssize_t flow_write(int fd, void * buf, size_t count);
 ssize_t flow_read(int fd, void * buf, size_t count);
+int     flow_select(const struct timespec * timeout);
 
 #endif
diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h
index 78926869..257a289d 100644
--- a/include/ouroboros/shm_ap_rbuff.h
+++ b/include/ouroboros/shm_ap_rbuff.h
@@ -25,6 +25,7 @@
 #define OUROBOROS_SHM_AP_RBUFF_H
 
 #include <sys/types.h>
+#include <sys/time.h>
 #include <stdbool.h>
 
 struct shm_ap_rbuff;
@@ -41,6 +42,8 @@ void                  shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb);
 int                   shm_ap_rbuff_write(struct shm_ap_rbuff * rb,
                                          struct rb_entry * e);
 struct rb_entry *     shm_ap_rbuff_read(struct shm_ap_rbuff * rb);
+int                   shm_ap_rbuff_peek(struct shm_ap_rbuff * rb,
+                                        const struct timespec * timeout);
 ssize_t               shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb,
                                              int port_id);
 pid_t                 shm_ap_rbuff_get_api(struct shm_ap_rbuff * rb);
diff --git a/include/ouroboros/shm_du_map.h b/include/ouroboros/shm_du_map.h
index 11fe35c6..b9c56cf8 100644
--- a/include/ouroboros/shm_du_map.h
+++ b/include/ouroboros/shm_du_map.h
@@ -39,13 +39,12 @@ void                shm_du_map_destroy(struct shm_du_map * dum);
 void *              shm_du_map_sanitize(void * o);
 
 /* returns the index of the buffer in the DU map */
-ssize_t  shm_du_map_write(struct shm_du_map * dum,
-                          pid_t               dst_api,
-                          size_t              headspace,
-                          size_t              tailspace,
-                          uint8_t *           data,
-                          size_t              data_len);
-
+ssize_t   shm_du_map_write(struct shm_du_map * dum,
+                           pid_t               dst_api,
+                           size_t              headspace,
+                           size_t              tailspace,
+                           uint8_t *           data,
+                           size_t              data_len);
 int       shm_du_map_read(uint8_t **          dst,
                           struct shm_du_map * dum,
                           ssize_t             idx);
diff --git a/src/lib/dev.c b/src/lib/dev.c
index f13c8423..22e77169 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -130,17 +130,14 @@ void ap_fini(void)
         free(_ap_instance);
 }
 
-#if 0
 static int port_id_to_fd(int port_id)
 {
         int i;
         for (i = 0; i < AP_MAX_FLOWS; ++i)
-                if (_ap_instance->flows[i].port_id == port_id
-                        && _ap_instance->flows[i].state != FLOW_NULL)
+                if (_ap_instance->flows[i].port_id == port_id)
                         return i;
         return -1;
 }
-#endif
 
 int flow_accept(char ** ae_name)
 {
@@ -523,6 +520,14 @@ ssize_t flow_write(int fd, void * buf, size_t count)
         return 0;
 }
 
+int flow_select(const struct timespec * timeout)
+{
+        int port_id = shm_ap_rbuff_peek(_ap_instance->rb, timeout);
+        if (port_id < 0)
+                return port_id;
+        return port_id_to_fd(port_id);
+}
+
 ssize_t flow_read(int fd, void * buf, size_t count)
 {
         int idx = -1;
diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c
index 84f7617a..be4cd0c2 100644
--- a/src/lib/shm_ap_rbuff.c
+++ b/src/lib/shm_ap_rbuff.c
@@ -28,6 +28,7 @@
 #include <ouroboros/logs.h>
 #include <ouroboros/shm_ap_rbuff.h>
 #include <ouroboros/lockfile.h>
+#include <ouroboros/time_utils.h>
 
 #include <pthread.h>
 #include <sys/mman.h>
@@ -39,6 +40,8 @@
 #include <signal.h>
 #include <sys/stat.h>
 
+#define PTHREAD_COND_CLOCK CLOCK_MONOTONIC
+
 #define SHM_RBUFF_FILE_SIZE (SHM_RBUFF_SIZE * sizeof(struct rb_entry)          \
                              + 2 * sizeof(size_t) + sizeof(pthread_mutex_t)    \
                              + sizeof (pthread_cond_t))
@@ -54,7 +57,7 @@ struct shm_ap_rbuff {
         struct rb_entry * shm_base;    /* start of entry */
         size_t *          ptr_head;    /* start of ringbuffer head */
         size_t *          ptr_tail;    /* start of ringbuffer tail */
-        pthread_mutex_t * shm_mutex;   /* lock all free space in shm */
+        pthread_mutex_t * lock;        /* lock all free space in shm */
         pthread_cond_t *  work;        /* threads will wait for a signal */
         pid_t             api;         /* api to which this rb belongs */
         int               fd;
@@ -73,31 +76,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
 
         rb = malloc(sizeof(*rb));
         if (rb == NULL) {
-                LOG_DBGF("Could not allocate struct.");
+                LOG_DBG("Could not allocate struct.");
                 return NULL;
         }
 
         shm_fd = shm_open(fn, O_CREAT | O_EXCL | O_RDWR, 0666);
         if (shm_fd == -1) {
-                LOG_DBGF("Failed creating ring buffer.");
+                LOG_DBG("Failed creating ring buffer.");
                 free(rb);
                 return NULL;
         }
 
         if (fchmod(shm_fd, 0666)) {
-                LOG_DBGF("Failed to chmod shared memory.");
+                LOG_DBG("Failed to chmod shared memory.");
                 free(rb);
                 return NULL;
         }
 
         if (ftruncate(shm_fd, SHM_RBUFF_FILE_SIZE - 1) < 0) {
-                LOG_DBGF("Failed to extend ringbuffer.");
+                LOG_DBG("Failed to extend ringbuffer.");
                 free(rb);
                 return NULL;
         }
 
         if (write(shm_fd, "", 1) != 1) {
-                LOG_DBGF("Failed to finalise extension of ringbuffer.");
+                LOG_DBG("Failed to finalise extension of ringbuffer.");
                 free(rb);
                 return NULL;
         }
@@ -110,30 +113,31 @@ struct shm_ap_rbuff * shm_ap_rbuff_create()
                         0);
 
         if (shm_base == MAP_FAILED) {
-                LOG_DBGF("Failed to map shared memory.");
+                LOG_DBG("Failed to map shared memory.");
                 if (close(shm_fd) == -1)
-                        LOG_DBGF("Failed to close invalid shm.");
+                        LOG_DBG("Failed to close invalid shm.");
 
                 if (shm_unlink(fn) == -1)
-                        LOG_DBGF("Failed to remove invalid shm.");
+                        LOG_DBG("Failed to remove invalid shm.");
 
                 free(rb);
                 return NULL;
         }
 
-        rb->shm_base  = shm_base;
-        rb->ptr_head  = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
-        rb->ptr_tail  = rb->ptr_head + 1;
-        rb->shm_mutex = (pthread_mutex_t *) (rb->ptr_tail + 1);
-        rb->work      = (pthread_cond_t *) (rb->shm_mutex + 1);
+        rb->shm_base = shm_base;
+        rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+        rb->ptr_tail = rb->ptr_head + 1;
+        rb->lock     = (pthread_mutex_t *) (rb->ptr_tail + 1);
+        rb->work     = (pthread_cond_t *) (rb->lock + 1);
 
         pthread_mutexattr_init(&mattr);
         pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
         pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
-        pthread_mutex_init(rb->shm_mutex, &mattr);
+        pthread_mutex_init(rb->lock, &mattr);
 
         pthread_condattr_init(&cattr);
         pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
+        pthread_condattr_setclock(&cattr, PTHREAD_COND_CLOCK);
         pthread_cond_init(rb->work, &cattr);
 
         *rb->ptr_head = 0;
@@ -156,13 +160,13 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
 
         rb = malloc(sizeof(*rb));
         if (rb == NULL) {
-                LOG_DBGF("Could not allocate struct.");
+                LOG_DBG("Could not allocate struct.");
                 return NULL;
         }
 
         shm_fd = shm_open(fn, O_RDWR, 0666);
         if (shm_fd == -1) {
-                LOG_DBGF("%d failed opening shared memory %s.", getpid(), fn);
+                LOG_DBG("%d failed opening shared memory %s.", getpid(), fn);
                 return NULL;
         }
 
@@ -174,22 +178,22 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
                         0);
 
         if (shm_base == MAP_FAILED) {
-                LOG_DBGF("Failed to map shared memory.");
+                LOG_DBG("Failed to map shared memory.");
                 if (close(shm_fd) == -1)
-                        LOG_DBGF("Failed to close invalid shm.");
+                        LOG_DBG("Failed to close invalid shm.");
 
                 if (shm_unlink(fn) == -1)
-                        LOG_DBGF("Failed to remove invalid shm.");
+                        LOG_DBG("Failed to remove invalid shm.");
 
                 free(rb);
                 return NULL;
         }
 
-        rb->shm_base  = shm_base;
-        rb->ptr_head  = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
-        rb->ptr_tail  = rb->ptr_head + 1;
-        rb->shm_mutex = (pthread_mutex_t *) (rb->ptr_tail + 1);
-        rb->work      = (pthread_cond_t *) (rb->shm_mutex + 1);
+        rb->shm_base = shm_base;
+        rb->ptr_head = (size_t *) (rb->shm_base + SHM_RBUFF_SIZE);
+        rb->ptr_tail = rb->ptr_head + 1;
+        rb->lock     = (pthread_mutex_t *) (rb->ptr_tail + 1);
+        rb->work     = (pthread_cond_t *) (rb->lock + 1);
 
         rb->fd = shm_fd;
         rb->api = api;
@@ -199,15 +203,15 @@ struct shm_ap_rbuff * shm_ap_rbuff_open(pid_t api)
 void shm_ap_rbuff_close(struct shm_ap_rbuff * rb)
 {
         if (rb == NULL) {
-                LOG_DBGF("Bogus input. Bugging out.");
+                LOG_DBG("Bogus input. Bugging out.");
                 return;
         }
 
         if (close(rb->fd) < 0)
-                LOG_DBGF("Couldn't close shared memory.");
+                LOG_DBG("Couldn't close shared memory.");
 
         if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
-                LOG_DBGF("Couldn't unmap shared memory.");
+                LOG_DBG("Couldn't unmap shared memory.");
 
         free(rb);
 }
@@ -218,7 +222,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
         struct lockfile * lf = NULL;
 
         if (rb == NULL) {
-                LOG_DBGF("Bogus input. Bugging out.");
+                LOG_DBG("Bogus input. Bugging out.");
                 return;
         }
 
@@ -227,7 +231,7 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
                 if (lf == NULL)
                         return;
                 if (lockfile_owner(lf) == getpid()) {
-                        LOG_DBGF("Ringbuffer %d destroyed by IRMd %d.",
+                        LOG_DBG("Ringbuffer %d destroyed by IRMd %d.",
                                  rb->api, getpid());
                         lockfile_close(lf);
                 } else {
@@ -238,16 +242,19 @@ void shm_ap_rbuff_destroy(struct shm_ap_rbuff * rb)
                 }
         }
 
+        pthread_mutex_destroy(rb->lock);
+        pthread_cond_destroy(rb->work);
+
         if (close(rb->fd) < 0)
-                LOG_DBGF("Couldn't close shared memory.");
+                LOG_DBG("Couldn't close shared memory.");
 
         sprintf(fn, SHM_AP_RBUFF_PREFIX "%d", rb->api);
 
         if (munmap(rb->shm_base, SHM_RBUFF_FILE_SIZE) == -1)
-                LOG_DBGF("Couldn't unmap shared memory.");
+                LOG_DBG("Couldn't unmap shared memory.");
 
         if (shm_unlink(fn) == -1)
-                LOG_DBGF("Failed to unlink shm.");
+                LOG_DBG("Failed to unlink shm.");
 
         free(rb);
 }
@@ -257,13 +264,13 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
         if (rb == NULL || e == NULL)
                 return -1;
 
-        if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
-                LOG_DBGF("Recovering dead mutex.");
-                pthread_mutex_consistent(rb->shm_mutex);
+        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+                LOG_DBG("Recovering dead mutex.");
+                pthread_mutex_consistent(rb->lock);
         }
 
         if (!shm_rbuff_free(rb)) {
-                pthread_mutex_unlock(rb->shm_mutex);
+                pthread_mutex_unlock(rb->lock);
                 return -1;
         }
 
@@ -273,11 +280,59 @@ int shm_ap_rbuff_write(struct shm_ap_rbuff * rb, struct rb_entry * e)
         *head_el_ptr = *e;
         *rb->ptr_head = (*rb->ptr_head + 1) & (SHM_RBUFF_SIZE -1);
 
-        pthread_mutex_unlock(rb->shm_mutex);
+        pthread_mutex_unlock(rb->lock);
 
         return 0;
 }
 
+int shm_ap_rbuff_peek(struct shm_ap_rbuff * rb,
+                      const struct timespec * timeout)
+{
+        struct timespec abstime;
+        int ret = 0;
+
+        if (rb == NULL)
+                return -EINVAL;
+
+        if (timeout != NULL) {
+                clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+                ts_add(&abstime, timeout, &abstime);
+        }
+
+        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+                             (void *) rb->lock);
+
+        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+                LOG_DBG("Recovering dead mutex.");
+                pthread_mutex_consistent(rb->lock);
+        }
+
+        while (shm_rbuff_empty(rb)) {
+                if (timeout != NULL)
+                        ret = pthread_cond_timedwait(rb->work,
+                                                     rb->lock,
+                                                     &abstime);
+                else
+                        ret = pthread_cond_wait(rb->work, rb->lock);
+
+                if (ret == EOWNERDEAD) {
+                        LOG_DBG("Recovering dead mutex.");
+                        pthread_mutex_consistent(rb->lock);
+                }
+
+                if (ret == ETIMEDOUT) {
+                        pthread_mutex_unlock(rb->lock);
+                        return -ret;
+                }
+        }
+
+        ret = (rb->shm_base + *rb->ptr_tail)->port_id;
+
+        pthread_cleanup_pop(true);
+
+        return ret;
+}
+
 struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
 {
         struct rb_entry * e = NULL;
@@ -286,26 +341,25 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
                 return NULL;
 
         pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
-                             (void *) rb->shm_mutex);
+                             (void *) rb->lock);
 
-        if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
-                LOG_DBGF("Recovering dead mutex.");
-                pthread_mutex_consistent(rb->shm_mutex);
+        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+                LOG_DBG("Recovering dead mutex.");
+                pthread_mutex_consistent(rb->lock);
         }
 
-        while (tail_el_ptr->port_id < 0)
+        while (!shm_rbuff_empty(rb) && tail_el_ptr->port_id < 0)
                 *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
 
         while (shm_rbuff_empty(rb))
-                if (pthread_cond_wait(rb->work, rb->shm_mutex)
-                    == EOWNERDEAD) {
-                LOG_DBGF("Recovering dead mutex.");
-                pthread_mutex_consistent(rb->shm_mutex);
+                if (pthread_cond_wait(rb->work, rb->lock) == EOWNERDEAD) {
+                LOG_DBG("Recovering dead mutex.");
+                pthread_mutex_consistent(rb->lock);
         }
 
         e = malloc(sizeof(*e));
         if (e == NULL) {
-                pthread_mutex_unlock(rb->shm_mutex);
+                pthread_mutex_unlock(rb->lock);
                 return NULL;
         }
 
@@ -313,7 +367,7 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)
 
         *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
 
-        pthread_cleanup_pop(1);
+        pthread_cleanup_pop(true);
 
         return e;
 }
@@ -322,13 +376,13 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
 {
         ssize_t idx = -1;
 
-        if (pthread_mutex_lock(rb->shm_mutex) == EOWNERDEAD) {
-                LOG_DBGF("Recovering dead mutex.");
-                pthread_mutex_consistent(rb->shm_mutex);
+        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) {
+                LOG_DBG("Recovering dead mutex.");
+                pthread_mutex_consistent(rb->lock);
         }
 
         if (shm_rbuff_empty(rb)) {
-                pthread_mutex_unlock(rb->shm_mutex);
+                pthread_mutex_unlock(rb->lock);
                 return -1;
         }
 
@@ -336,7 +390,7 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
                 *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
 
         if (tail_el_ptr->port_id != port_id) {
-                pthread_mutex_unlock(rb->shm_mutex);
+                pthread_mutex_unlock(rb->lock);
                 return -1;
         }
 
@@ -344,7 +398,7 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)
 
         *rb->ptr_tail = (*rb->ptr_tail + 1) & (SHM_RBUFF_SIZE -1);
 
-        pthread_mutex_unlock(rb->shm_mutex);
+        pthread_mutex_unlock(rb->lock);
 
         return idx;
 }
@@ -355,9 +409,9 @@ pid_t shm_ap_rbuff_get_api(struct shm_ap_rbuff *rb)
         if (rb == NULL)
                 return -1;
 
-        pthread_mutex_lock(rb->shm_mutex);
+        pthread_mutex_lock(rb->lock);
         api = rb->api;
-        pthread_mutex_unlock(rb->shm_mutex);
+        pthread_mutex_unlock(rb->lock);
 
         return api;
 }
@@ -367,8 +421,8 @@ void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb)
         if (rb == NULL)
                 return;
 
-        pthread_mutex_lock(rb->shm_mutex);
+        pthread_mutex_lock(rb->lock);
         *rb->ptr_tail = 0;
         *rb->ptr_head = 0;
-        pthread_mutex_unlock(rb->shm_mutex);
+        pthread_mutex_unlock(rb->lock);
 }
diff --git a/src/lib/shm_du_map.c b/src/lib/shm_du_map.c
index 6dc4a1bd..b090bb74 100644
--- a/src/lib/shm_du_map.c
+++ b/src/lib/shm_du_map.c
@@ -416,7 +416,7 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
         int                  sz = size + sizeof *sdb;
 #endif
         uint8_t *            write_pos;
-        ssize_t              index = -1;
+        ssize_t              idx = -1;
 
         if (dum == NULL || data == NULL) {
                 LOG_DBGF("Bogus input, bugging out.");
@@ -475,7 +475,7 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
 
         memcpy(write_pos, data, len);
 
-        index = *dum->ptr_head;
+        idx = *dum->ptr_head;
 #ifdef SHM_DU_MAP_MULTI_BLOCK
         *dum->ptr_head = (*dum->ptr_head + blocks) & (SHM_BLOCKS_IN_MAP - 1);
 #else
@@ -483,7 +483,7 @@ ssize_t shm_du_map_write(struct shm_du_map * dum,
 #endif
         pthread_mutex_unlock(dum->shm_mutex);
 
-        return index;
+        return idx;
 }
 
 int shm_du_map_read(uint8_t **          dst,
-- 
cgit v1.2.3