diff options
| author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-09-06 09:57:24 +0200 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2016-09-06 09:57:24 +0200 | 
| commit | b85658366b18db60db684ed5ef218052177038d7 (patch) | |
| tree | d15cb04d68a063fc3418d0259c9e779514861fcf /src | |
| parent | d35685c537e7809d5c4a213fcfa553d8a522bc51 (diff) | |
| parent | 116cda0ae03bc4e7b8571cf1658775c13c03c68e (diff) | |
| download | ouroboros-b85658366b18db60db684ed5ef218052177038d7.tar.gz ouroboros-b85658366b18db60db684ed5ef218052177038d7.zip | |
Merged in dstaesse/ouroboros/be-select (pull request #242)
lib: dev: Provide a set of fds to flow_select
Diffstat (limited to 'src')
| -rw-r--r-- | src/lib/dev.c | 91 | ||||
| -rw-r--r-- | src/lib/shm_ap_rbuff.c | 88 | ||||
| -rw-r--r-- | src/tools/oping/oping.c | 9 | ||||
| -rw-r--r-- | src/tools/oping/oping_client.c | 2 | ||||
| -rw-r--r-- | src/tools/oping/oping_server.c | 26 | 
5 files changed, 188 insertions, 28 deletions
| diff --git a/src/lib/dev.c b/src/lib/dev.c index b7de921d..64327dd3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -28,9 +28,16 @@  #include <ouroboros/shm_rdrbuff.h>  #include <ouroboros/shm_ap_rbuff.h>  #include <ouroboros/utils.h> +#include <ouroboros/select.h>  #include <stdlib.h>  #include <string.h> +#include <stdio.h> + +struct flow_set { +        bool b[IRMD_MAX_FLOWS]; +        pthread_rwlock_t lock; +};  struct flow {          struct shm_ap_rbuff * rb; @@ -577,14 +584,6 @@ ssize_t flow_write(int fd, void * buf, size_t count)          return 0;  } -int flow_select(const struct timespec * timeout) -{ -        int port_id = shm_ap_rbuff_peek_b(ai->rb, timeout); -        if (port_id < 0) -                return port_id; -        return ai->ports[port_id]; -} -  ssize_t flow_read(int fd, void * buf, size_t count)  {          int idx = -1; @@ -638,3 +637,79 @@ ssize_t flow_read(int fd, void * buf, size_t count)          return n;  } + +/* select functions */ + +struct flow_set * flow_set_create() +{ +        struct flow_set * set = malloc(sizeof(*set)); +        if (set == NULL) +                return NULL; + +        if (pthread_rwlock_init(&set->lock, NULL)) { +                free(set); +                return NULL; +        } + +        memset(&set->b, 0, sizeof(set->b)); + +        return set; +} + +void flow_set_zero(struct flow_set * set) +{ +        pthread_rwlock_wrlock(&set->lock); +        memset(&set->b, 0, sizeof(set->b)); +        pthread_rwlock_unlock(&set->lock); +} + +void flow_set_add(struct flow_set * set, int fd) +{ +        pthread_rwlock_wrlock(&set->lock); +        set->b[ai->flows[fd].port_id] = true; +        pthread_rwlock_unlock(&set->lock); +} + +void flow_set_del(struct flow_set * set, int fd) +{ +        pthread_rwlock_wrlock(&set->lock); +        set->b[ai->flows[fd].port_id] = false; +        pthread_rwlock_unlock(&set->lock); +} + +bool flow_set_has(struct flow_set * set, int fd) +{ +        bool ret; +        pthread_rwlock_rdlock(&set->lock); +        ret = set->b[ai->flows[fd].port_id]; +        pthread_rwlock_unlock(&set->lock); +        return ret; +} + +void flow_set_destroy(struct flow_set * set) +{ +        pthread_rwlock_destroy(&set->lock); +        free(set); +} + +static void flow_set_cpy(bool * dst, struct flow_set * src) +{ +        pthread_rwlock_rdlock(&src->lock); +        memcpy(dst, src->b, IRMD_MAX_FLOWS); +        pthread_rwlock_unlock(&src->lock); +} + +int flow_select(struct flow_set * set, const struct timespec * timeout) +{ +        int port_id; +        bool b[IRMD_MAX_FLOWS]; +        if (set == NULL) { +                port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout); +        } else { +                flow_set_cpy(b, set); +                port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) b, timeout); +        } +        if (port_id < 0) +                return port_id; +        return ai->ports[port_id]; +} diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6cc9590e..e6665362 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -348,15 +348,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb)                  return -1;          } -        ret = (rb->shm_base + *rb->ptr_tail)->index; +        ret = tail_el_ptr(rb)->index;          pthread_mutex_unlock(rb->lock);          return ret;  } -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, -                        const struct timespec * timeout) +static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, +                                   const struct timespec * timeout)  {          struct timespec abstime;          int ret = 0; @@ -397,7 +397,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,          }          if (ret != ETIMEDOUT) -                ret = (rb->shm_base + *rb->ptr_tail)->port_id; +                ret = tail_el_ptr(rb)->port_id;          else                  ret = -ETIMEDOUT; @@ -406,6 +406,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb,          return ret;  } +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff *   rb, +                        bool *                  set, +                        const struct timespec * timeout) +{ +        struct timespec abstime; +        int ret; + +        if (set == NULL) +                return shm_ap_rbuff_peek_b_all(rb, timeout); + +#ifdef __APPLE__ +        pthread_mutex_lock(rb->lock); +#else +        if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { +                LOG_DBG("Recovering dead mutex."); +                pthread_mutex_consistent(rb->lock); +        } +#endif +        if (timeout != NULL) { +                clock_gettime(PTHREAD_COND_CLOCK, &abstime); +                ts_add(&abstime, timeout, &abstime); +        } + +        pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, +                             (void *) rb->lock); + +        while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) +               && (ret != ETIMEDOUT)) { +                while (shm_rbuff_empty(rb)) { +                        if (timeout != NULL) +                                ret = pthread_cond_timedwait(rb->add, +                                                             rb->lock, +                                                             &abstime); +                        else +                                ret = pthread_cond_wait(rb->add, rb->lock); + +#ifndef __APPLE__ +                        if (ret == EOWNERDEAD) { +                                LOG_DBG("Recovering dead mutex."); +                                pthread_mutex_consistent(rb->lock); +                        } +#endif +                        if (ret == ETIMEDOUT) +                                break; +                } + +                while (!set[tail_el_ptr(rb)->port_id]) { +                        if (timeout != NULL) +                                ret = pthread_cond_timedwait(rb->del, +                                                             rb->lock, +                                                             &abstime); +                        else +                                ret = pthread_cond_wait(rb->del, rb->lock); + +#ifndef __APPLE__ +                        if (ret == EOWNERDEAD) { +                                LOG_DBG("Recovering dead mutex."); +                                pthread_mutex_consistent(rb->lock); +                        } +#endif +                        if (ret == ETIMEDOUT) +                                break; +                } +        } + +        if (ret != ETIMEDOUT) +                ret = tail_el_ptr(rb)->port_id; +        else +                ret = -ETIMEDOUT; + +        pthread_cleanup_pop(true); + +        return ret; +} + +  struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb)  {          struct rb_entry * e = NULL; @@ -471,8 +547,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id)          return idx;  } -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, -                                 int port_id, +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff *   rb, +                                 int                     port_id,                                   const struct timespec * timeout)  {          struct timespec abstime; diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 2871e79e..7d2edf33 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -23,6 +23,9 @@  #define _POSIX_C_SOURCE 199506L +#include <ouroboros/select.h> +#include <ouroboros/dev.h> +  #include <stdio.h>  #include <string.h>  #include <pthread.h> @@ -59,9 +62,9 @@ struct c {  } client;  struct s { -        struct timespec times[OPING_MAX_FLOWS]; -        bool            flows[OPING_MAX_FLOWS]; -        pthread_mutex_t lock; +        struct timespec   times[OPING_MAX_FLOWS]; +        struct flow_set * flows; +        pthread_mutex_t   lock;          pthread_t cleaner_pt;          pthread_t accept_pt; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 6e1fbc54..3a254984 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -65,7 +65,7 @@ void * reader(void * o)          /* FIXME: use flow timeout option once we have it */          while(client.rcvd != client.count && -              (fd = flow_select(&timeout)) != -ETIMEDOUT) { +              (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) {                  flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK);                  while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {                          if (msg_len < 0) diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 7761110d..9c7b1be7 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -21,8 +21,6 @@   * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.   */ -#include <ouroboros/dev.h> -  #ifdef __FreeBSD__  #define __XSI_VISIBLE 500  #endif @@ -53,9 +51,9 @@ void * cleaner_thread(void * o)                  clock_gettime(CLOCK_REALTIME, &now);                  pthread_mutex_lock(&server.lock);                  for (i = 0; i < OPING_MAX_FLOWS; ++i) -                        if (server.flows[i] && +                        if (flow_set_has(server.flows, i) &&                              ts_diff_ms(&server.times[i], &now) > deadline_ms) { -                                server.flows[i] = false; +                                flow_set_del(server.flows, i);                                  flow_dealloc(i);                          } @@ -70,10 +68,16 @@ void * server_thread(void *o)          int msg_len = 0;          struct oping_msg * msg = (struct oping_msg *) buf;          struct timespec now = {0, 0}; +        struct timespec timeout = {0, 100 * MILLION};          while (true) { - -                int fd = flow_select(NULL); +                int fd = flow_select(server.flows, &timeout); +                if (fd == -ETIMEDOUT) +                        continue; +                if (fd < 0) { +                        printf("Failed to get active fd.\n"); +                        continue; +                }                  while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) {                          if (msg_len < 0)                                  continue; @@ -126,7 +130,7 @@ void * accept_thread(void * o)                  clock_gettime(CLOCK_REALTIME, &now);                  pthread_mutex_lock(&server.lock); -                server.flows[fd] = true; +                flow_set_add(server.flows, fd);                  server.times[fd] = now;                  pthread_mutex_unlock(&server.lock); @@ -139,7 +143,6 @@ void * accept_thread(void * o)  int server_main()  {          struct sigaction sig_act; -        int i = 0;          memset(&sig_act, 0, sizeof sig_act);          sig_act.sa_sigaction = &shutdown_server; @@ -153,8 +156,11 @@ int server_main()                  return -1;          } -        for (i = 0; i < OPING_MAX_FLOWS; ++i) -                server.flows[i] = false; +        server.flows = flow_set_create(); +        if (server.flows == NULL) +                return 0; + +        flow_set_zero(server.flows);          pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);          pthread_create(&server.accept_pt, NULL, accept_thread, NULL); | 
