From 116cda0ae03bc4e7b8571cf1658775c13c03c68e Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Sun, 4 Sep 2016 18:11:53 +0200 Subject: lib: dev: Provide a set of fds to flow_select The flow_select call now takes as a parameter a flow_set_t, which specifies a set of flow descriptors that will unblock the select call when an SDU for one of them arrives. The select call has been moved to its own header. --- include/ouroboros/CMakeLists.txt | 3 +- include/ouroboros/dev.h | 19 ++++++--- include/ouroboros/select.h | 50 ++++++++++++++++++++++ include/ouroboros/shm_ap_rbuff.h | 5 ++- src/lib/dev.c | 91 ++++++++++++++++++++++++++++++++++++---- src/lib/shm_ap_rbuff.c | 88 +++++++++++++++++++++++++++++++++++--- src/tools/oping/oping.c | 9 ++-- src/tools/oping/oping_client.c | 2 +- src/tools/oping/oping_server.c | 26 +++++++----- 9 files changed, 258 insertions(+), 35 deletions(-) create mode 100644 include/ouroboros/select.h diff --git a/include/ouroboros/CMakeLists.txt b/include/ouroboros/CMakeLists.txt index ae922b89..78a7bb9c 100644 --- a/include/ouroboros/CMakeLists.txt +++ b/include/ouroboros/CMakeLists.txt @@ -10,7 +10,8 @@ set(HEADER_FILES irm.h irm_config.h nsm.h - qos.h) + qos.h + select.h) install(FILES ${HEADER_FILES} DESTINATION usr/include/ouroboros) diff --git a/include/ouroboros/dev.h b/include/ouroboros/dev.h index d5fb744b..fe5ff4b5 100644 --- a/include/ouroboros/dev.h +++ b/include/ouroboros/dev.h @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -34,10 +33,12 @@ /* These calls should be removed once we write the ouroboros OS. */ int ap_init(char * ap_name); + void ap_fini(void); /* Returns file descriptor (> 0) and client AE name. */ int flow_accept(char ** ae_name); + int flow_alloc_resp(int fd, int result); /* @@ -47,13 +48,21 @@ int flow_alloc_resp(int fd, int result); int flow_alloc(char * dst_name, char * src_ae_name, struct qos_spec * qos); + int flow_alloc_res(int fd); int flow_dealloc(int fd); -int flow_cntl(int fd, int cmd, int oflags); -ssize_t flow_write(int fd, void * buf, size_t count); -ssize_t flow_read(int fd, void * buf, size_t count); -int flow_select(const struct timespec * timeout); +int flow_cntl(int fd, + int cmd, + int oflags); + +ssize_t flow_write(int fd, + void * buf, + size_t count); + +ssize_t flow_read(int fd, + void * buf, + size_t count); #endif diff --git a/include/ouroboros/select.h b/include/ouroboros/select.h new file mode 100644 index 00000000..9e0b8fec --- /dev/null +++ b/include/ouroboros/select.h @@ -0,0 +1,50 @@ +/* + * Ouroboros - Copyright (C) 2016 + * + * A select call for flows + * + * Dimitri Staessens + * Sander Vrijders + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#ifndef OUROBOROS_SELECT_H +#define OUROBOROS_SELECT_H + +#include +#include + +struct flow_set; + +struct flow_set * flow_set_create(); + +void flow_set_destroy(struct flow_set * set); + +void flow_set_zero(struct flow_set * set); + +void flow_set_add(struct flow_set * set, + int fd); + +void flow_set_del(struct flow_set * set, + int fd); + +bool flow_set_has(struct flow_set * set, + int fd); + +int flow_select(struct flow_set * set, + const struct timespec * timeout); + +#endif /* OUROBOROS_SELECT_H */ diff --git a/include/ouroboros/shm_ap_rbuff.h b/include/ouroboros/shm_ap_rbuff.h index 594c9260..6b11fd2d 100644 --- a/include/ouroboros/shm_ap_rbuff.h +++ b/include/ouroboros/shm_ap_rbuff.h @@ -24,6 +24,7 @@ #ifndef OUROBOROS_SHM_AP_RBUFF_H #define OUROBOROS_SHM_AP_RBUFF_H +#include #include #include #include @@ -58,7 +59,8 @@ struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb); int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb); -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, + bool * set, const struct timespec * timeout); ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, @@ -69,4 +71,5 @@ ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, const struct timespec * timeout); void shm_ap_rbuff_reset(struct shm_ap_rbuff * rb); + #endif /* OUROBOROS_SHM_AP_RBUFF_H */ diff --git a/src/lib/dev.c b/src/lib/dev.c index b7de921d..64327dd3 100644 --- a/src/lib/dev.c +++ b/src/lib/dev.c @@ -28,9 +28,16 @@ #include #include #include +#include #include #include +#include + +struct flow_set { + bool b[IRMD_MAX_FLOWS]; + pthread_rwlock_t lock; +}; struct flow { struct shm_ap_rbuff * rb; @@ -577,14 +584,6 @@ ssize_t flow_write(int fd, void * buf, size_t count) return 0; } -int flow_select(const struct timespec * timeout) -{ - int port_id = shm_ap_rbuff_peek_b(ai->rb, timeout); - if (port_id < 0) - return port_id; - return ai->ports[port_id]; -} - ssize_t flow_read(int fd, void * buf, size_t count) { int idx = -1; @@ -638,3 +637,79 @@ ssize_t flow_read(int fd, void * buf, size_t count) return n; } + +/* select functions */ + +struct flow_set * flow_set_create() +{ + struct flow_set * set = malloc(sizeof(*set)); + if (set == NULL) + return NULL; + + if (pthread_rwlock_init(&set->lock, NULL)) { + free(set); + return NULL; + } + + memset(&set->b, 0, sizeof(set->b)); + + return set; +} + +void flow_set_zero(struct flow_set * set) +{ + pthread_rwlock_wrlock(&set->lock); + memset(&set->b, 0, sizeof(set->b)); + pthread_rwlock_unlock(&set->lock); +} + +void flow_set_add(struct flow_set * set, int fd) +{ + pthread_rwlock_wrlock(&set->lock); + set->b[ai->flows[fd].port_id] = true; + pthread_rwlock_unlock(&set->lock); +} + +void flow_set_del(struct flow_set * set, int fd) +{ + pthread_rwlock_wrlock(&set->lock); + set->b[ai->flows[fd].port_id] = false; + pthread_rwlock_unlock(&set->lock); +} + +bool flow_set_has(struct flow_set * set, int fd) +{ + bool ret; + pthread_rwlock_rdlock(&set->lock); + ret = set->b[ai->flows[fd].port_id]; + pthread_rwlock_unlock(&set->lock); + return ret; +} + +void flow_set_destroy(struct flow_set * set) +{ + pthread_rwlock_destroy(&set->lock); + free(set); +} + +static void flow_set_cpy(bool * dst, struct flow_set * src) +{ + pthread_rwlock_rdlock(&src->lock); + memcpy(dst, src->b, IRMD_MAX_FLOWS); + pthread_rwlock_unlock(&src->lock); +} + +int flow_select(struct flow_set * set, const struct timespec * timeout) +{ + int port_id; + bool b[IRMD_MAX_FLOWS]; + if (set == NULL) { + port_id = shm_ap_rbuff_peek_b(ai->rb, NULL, timeout); + } else { + flow_set_cpy(b, set); + port_id = shm_ap_rbuff_peek_b(ai->rb, (bool *) b, timeout); + } + if (port_id < 0) + return port_id; + return ai->ports[port_id]; +} diff --git a/src/lib/shm_ap_rbuff.c b/src/lib/shm_ap_rbuff.c index 6cc9590e..e6665362 100644 --- a/src/lib/shm_ap_rbuff.c +++ b/src/lib/shm_ap_rbuff.c @@ -348,15 +348,15 @@ int shm_ap_rbuff_peek_idx(struct shm_ap_rbuff * rb) return -1; } - ret = (rb->shm_base + *rb->ptr_tail)->index; + ret = tail_el_ptr(rb)->index; pthread_mutex_unlock(rb->lock); return ret; } -int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, - const struct timespec * timeout) +static int shm_ap_rbuff_peek_b_all(struct shm_ap_rbuff * rb, + const struct timespec * timeout) { struct timespec abstime; int ret = 0; @@ -397,7 +397,7 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, } if (ret != ETIMEDOUT) - ret = (rb->shm_base + *rb->ptr_tail)->port_id; + ret = tail_el_ptr(rb)->port_id; else ret = -ETIMEDOUT; @@ -406,6 +406,82 @@ int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, return ret; } +int shm_ap_rbuff_peek_b(struct shm_ap_rbuff * rb, + bool * set, + const struct timespec * timeout) +{ + struct timespec abstime; + int ret; + + if (set == NULL) + return shm_ap_rbuff_peek_b_all(rb, timeout); + +#ifdef __APPLE__ + pthread_mutex_lock(rb->lock); +#else + if (pthread_mutex_lock(rb->lock) == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (timeout != NULL) { + clock_gettime(PTHREAD_COND_CLOCK, &abstime); + ts_add(&abstime, timeout, &abstime); + } + + pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock, + (void *) rb->lock); + + while ((shm_rbuff_empty(rb) || !set[tail_el_ptr(rb)->port_id]) + && (ret != ETIMEDOUT)) { + while (shm_rbuff_empty(rb)) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->add, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->add, rb->lock); + +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) + break; + } + + while (!set[tail_el_ptr(rb)->port_id]) { + if (timeout != NULL) + ret = pthread_cond_timedwait(rb->del, + rb->lock, + &abstime); + else + ret = pthread_cond_wait(rb->del, rb->lock); + +#ifndef __APPLE__ + if (ret == EOWNERDEAD) { + LOG_DBG("Recovering dead mutex."); + pthread_mutex_consistent(rb->lock); + } +#endif + if (ret == ETIMEDOUT) + break; + } + } + + if (ret != ETIMEDOUT) + ret = tail_el_ptr(rb)->port_id; + else + ret = -ETIMEDOUT; + + pthread_cleanup_pop(true); + + return ret; +} + + struct rb_entry * shm_ap_rbuff_read(struct shm_ap_rbuff * rb) { struct rb_entry * e = NULL; @@ -471,8 +547,8 @@ ssize_t shm_ap_rbuff_read_port(struct shm_ap_rbuff * rb, int port_id) return idx; } -ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, - int port_id, +ssize_t shm_ap_rbuff_read_port_b(struct shm_ap_rbuff * rb, + int port_id, const struct timespec * timeout) { struct timespec abstime; diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index 2871e79e..7d2edf33 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -23,6 +23,9 @@ #define _POSIX_C_SOURCE 199506L +#include +#include + #include #include #include @@ -59,9 +62,9 @@ struct c { } client; struct s { - struct timespec times[OPING_MAX_FLOWS]; - bool flows[OPING_MAX_FLOWS]; - pthread_mutex_t lock; + struct timespec times[OPING_MAX_FLOWS]; + struct flow_set * flows; + pthread_mutex_t lock; pthread_t cleaner_pt; pthread_t accept_pt; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 6e1fbc54..3a254984 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -65,7 +65,7 @@ void * reader(void * o) /* FIXME: use flow timeout option once we have it */ while(client.rcvd != client.count && - (fd = flow_select(&timeout)) != -ETIMEDOUT) { + (fd = flow_select(NULL, &timeout)) != -ETIMEDOUT) { flow_cntl(fd, FLOW_F_SETFL, FLOW_O_NONBLOCK); while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { if (msg_len < 0) diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index 7761110d..9c7b1be7 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -21,8 +21,6 @@ * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ -#include - #ifdef __FreeBSD__ #define __XSI_VISIBLE 500 #endif @@ -53,9 +51,9 @@ void * cleaner_thread(void * o) clock_gettime(CLOCK_REALTIME, &now); pthread_mutex_lock(&server.lock); for (i = 0; i < OPING_MAX_FLOWS; ++i) - if (server.flows[i] && + if (flow_set_has(server.flows, i) && ts_diff_ms(&server.times[i], &now) > deadline_ms) { - server.flows[i] = false; + flow_set_del(server.flows, i); flow_dealloc(i); } @@ -70,10 +68,16 @@ void * server_thread(void *o) int msg_len = 0; struct oping_msg * msg = (struct oping_msg *) buf; struct timespec now = {0, 0}; + struct timespec timeout = {0, 100 * MILLION}; while (true) { - - int fd = flow_select(NULL); + int fd = flow_select(server.flows, &timeout); + if (fd == -ETIMEDOUT) + continue; + if (fd < 0) { + printf("Failed to get active fd.\n"); + continue; + } while (!((msg_len = flow_read(fd, buf, OPING_BUF_SIZE)) < 0)) { if (msg_len < 0) continue; @@ -126,7 +130,7 @@ void * accept_thread(void * o) clock_gettime(CLOCK_REALTIME, &now); pthread_mutex_lock(&server.lock); - server.flows[fd] = true; + flow_set_add(server.flows, fd); server.times[fd] = now; pthread_mutex_unlock(&server.lock); @@ -139,7 +143,6 @@ void * accept_thread(void * o) int server_main() { struct sigaction sig_act; - int i = 0; memset(&sig_act, 0, sizeof sig_act); sig_act.sa_sigaction = &shutdown_server; @@ -153,8 +156,11 @@ int server_main() return -1; } - for (i = 0; i < OPING_MAX_FLOWS; ++i) - server.flows[i] = false; + server.flows = flow_set_create(); + if (server.flows == NULL) + return 0; + + flow_set_zero(server.flows); pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL); pthread_create(&server.accept_pt, NULL, accept_thread, NULL); -- cgit v1.2.3