From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 19 Oct 2016 22:25:46 +0200 Subject: lib: Demultiplex the fast path The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API. --- src/ipcpd/shim-udp/main.c | 84 ++++++++++++++++++++++++++++++++++------------- 1 file changed, 61 insertions(+), 23 deletions(-) (limited to 'src/ipcpd/shim-udp') diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 7c109a8a..050623e4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -27,6 +27,9 @@ #include #include #include +#include +#include +#include #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -75,6 +78,7 @@ struct { struct sockaddr_in s_saddr; int s_fd; + flow_set_t * np1_flows; fd_set flow_fd_s; /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ int uf_to_fd[FD_SETSIZE]; @@ -90,7 +94,7 @@ struct { pthread_mutex_t fd_set_lock; } udp_data; -static void udp_data_init() +static int udp_data_init() { int i; @@ -104,13 +108,21 @@ static void udp_data_init() FD_ZERO(&udp_data.flow_fd_s); + udp_data.np1_flows = flow_set_create(); + if (udp_data.np1_flows == NULL) + return -ENOMEM; + pthread_rwlock_init(&udp_data.flows_lock, NULL); pthread_cond_init(&udp_data.fd_set_cond, NULL); pthread_mutex_init(&udp_data.fd_set_lock, NULL); + + return 0; } static void udp_data_fini() { + flow_set_destroy(udp_data.np1_flows); + pthread_rwlock_destroy(&udp_data.flows_lock); pthread_mutex_destroy(&udp_data.fd_set_lock); pthread_cond_destroy(&udp_data.fd_set_cond); @@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port) pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); close(skfd); @@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader() static void * ipcp_udp_sdu_loop(void * o) { + int fd; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; + struct shm_du_buff * sdb; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd; - struct shm_du_buff * sdb; + int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = ipcp_read_shim(&sdb); - if (fd < 0) + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); continue; + } - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } - fd = udp_data.fd_to_uf[fd].skfd; + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + fd = udp_data.fd_to_uf[fd].skfd; + + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (send(fd, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - 0) < 0) - LOG_ERR("Failed to send SDU."); + if (send(fd, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + 0) < 0) + LOG_ERR("Failed to send SDU."); - ipcp_flow_del(sdb); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd, udp_data.fd_to_uf[fd].skfd = skfd; udp_data.uf_to_fd[skfd] = fd; + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response) set_fd(skfd); + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd) { int skfd = -1; int remote_udp = -1; + struct timespec t = {0, 10000}; struct sockaddr_in r_saddr; socklen_t r_saddr_len = sizeof(r_saddr); + flow_set_del(udp_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd) close(skfd); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1149,13 +1184,16 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - udp_data_init(); - if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } + if (udp_data_init() < 0) { + close_logfile(); + exit(EXIT_FAILURE); + } + /* store the process id of the irmd */ irmd_api = atoi(argv[1]); @@ -1196,10 +1234,10 @@ int main(int argc, char * argv[]) pthread_join(udp_data.handler, NULL); pthread_join(udp_data.sdu_reader, NULL); - ap_fini(); - udp_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); -- cgit v1.2.3