diff options
Diffstat (limited to 'src/lib')
| -rw-r--r-- | src/lib/dev.c | 91 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 88 | 
2 files changed, 165 insertions, 14 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index b7de921d..64327dd3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -28,9 +28,16 @@  #include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/utils.h> +#include <ouroboros/select.h>  #include <stdlib.h>  #include <string.h> +#include <stdio.h> + +struct flow_set { +        bool b[IRMD_MAX_FLOWS]; +        pthread_rwlock_t lock; +};  struct flow {          struct shm_ap_rbuff * rb; @@ -577,14 +584,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)          return 0;  } -int flow_select(const struct timespec * timeout) -{ -        int port_id = shm_ap_rbuff_peek_b(ai->rb, timeout); -        if (port_id < 0) -                return port_id; -        return ai->ports[port_id]; -} -  ssize_t flow_read(int fd, void * buf, size_t count)  {          int idx = -1; @@ -638,3 +637,79 @@ ssize_t flow_read(int fd, void * buf, size_t count)          return n;  } + +/* select functions */ + +struct flow_set * flow_set_create() +{ +        struct flow_set * set = malloc(sizeof(*set)); +        if (set == NULL) +                return NULL; + +        if (pthread_rwlock_init(&set->lock, NULL)) { +                free(set); +                return NULL; +        } + +        memset(&set->b, 0, sizeof(set->b)); + +        return set; +} + +void flow_set_zero(struct flow_set * set) +{ +        pthread_rwlock_wrlock(&set->lock); +        memset(&set->b, 0, sizeof(set->b)); +        pthread_rwlock_unlock(&set->lock); +} + +void flow_set_add(struct flow_set * set, int fd) +{ +        pthread_rwlock_wrlock(&set->lock); +        set->b[ai->flows[fd].port_id] = true; +        pthread_rwlock_unlock(&set->lock); +} + +void flow_set_del(struct flow_set * set, int fd) +{ +        pthread_rwlock_wrlock(&set->lock); +        set->b[ai->flows[fd].port_id] = false; +        pthread_rwlock_unlock(&set->lock); +} + +bool flow_set_has(struct flow_set * set, int fd) +{ +        bool ret; +        pthread_rwlock_rdlock(&set->lock); +        ret = set->b[ai->flows[fd].port_id]; +        pthread_rwlock_unlock(&set->lock); +        return ret; +} + +void flow_set_destroy(struct flow_set * set) +{ +        pthread_rwlock_destroy(&set->lock); +        free(set); +} + +static void flow_set_cpy(bool * dst, struct flow_set * src) +{ +        pthread_rwlock_rdlock(&src->lock); +        memcpy(dst, src->b, IRMD_MAX_FLOWS); +        pthread_rwlock_unlock(&src->lock); +} + +int flow_select(struct flow_set * set, const struct timespec * timeout) +{ +        int port_id; +        bool b[IRMD_MAX_FLOWS]; +        if (set == NULL) { +                port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout); +        } else { +                flow_set_cpy(b, set); +                port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) b, timeout); +        } +        if (port_id < 0) +                return port_id; +        return ai->ports[port_id]; +} diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6cc9590e..e6665362 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -348,15 +348,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)                  return -1;          } -        ret = (rb->shm_base + *rb->ptr_tail)->index; +        ret = tail_el_ptr(rb)->index;          pthread_mutex_unlock(rb->lock);          return ret;  } -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, -                        const struct timespec * timeout) +static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, +                                   const struct timespec * timeout)  {          struct timespec abstime;          int ret = 0; @@ -397,7 +397,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,          }          if (ret != ETIMEDOUT) -                ret = (rb->shm_base + *rb->ptr_tail)->port_id; +                ret = tail_el_ptr(rb)->port_id;          else                  ret = -ETIMEDOUT; @@ -406,6 +406,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,          return ret;  } +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff *   rb, +                        bool *                  set, +                        const struct timespec * timeout) +{ +        struct timespec abstime; +        int ret; + +        if (set == NULL) +                return shm_ap_rbuff_peek_b_all(rb, timeout); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) +               && (ret != ETIMEDOUT)) { +                while (shm_rbuff_empty(rb)) { +                        if (timeout != NULL) +                                ret = pthread_cond_timedwait(rb->add, +                                                             rb->lock, +                                                             &abstime); +                        else +                                ret = pthread_cond_wait(rb->add, rb->lock); + +#ifndef __APPLE__ +                        if (ret == EOWNERDEAD) { +                                LOG_DBG("Recovering dead mutex."); +                                pthread_mutex_consistent(rb->lock); +                        } +#endif +                        if (ret == ETIMEDOUT) +                                break; +                } + +                while (!set[tail_el_ptr(rb)->port_id]) { +                        if (timeout != NULL) +                                ret = pthread_cond_timedwait(rb->del, +                                                             rb->lock, +                                                             &abstime); +                        else +                                ret = pthread_cond_wait(rb->del, rb->lock); + +#ifndef __APPLE__ +                        if (ret == EOWNERDEAD) { +                                LOG_DBG("Recovering dead mutex."); +                                pthread_mutex_consistent(rb->lock); +                        } +#endif +                        if (ret == ETIMEDOUT) +                                break; +                } +        } + +        if (ret != ETIMEDOUT) +                ret = tail_el_ptr(rb)->port_id; +        else +                ret = -ETIMEDOUT; + +        pthread_cleanup_pop(true); + +        return ret; +} + +  struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)  {          struct rb_entry * e = NULL; @@ -471,8 +547,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)          return idx;  } -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, -                                 int port_id, +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb, +                                 int                     port_id,                                   const struct timespec * timeout)  {          struct timespec abstime; | 
