From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 19 Oct 2016 22:25:46 +0200 Subject: lib: Demultiplex the fast path The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API. --- src/ipcpd/local/main.c | 68 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 17 deletions(-) (limited to 'src/ipcpd/local/main.c') diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 4e500a8a..68c9ae8c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include #define OUROBOROS_PREFIX "ipcpd/local" @@ -39,6 +39,7 @@ #include #include +#define EVENT_WAIT_TIMEOUT 100 /* us */ #define THIS_TYPE IPCP_LOCAL /* global for trapping signal */ @@ -46,18 +47,25 @@ int irmd_api; struct { int in_out[IRMD_MAX_FLOWS]; + flow_set_t * flows; pthread_rwlock_t lock; pthread_t sduloop; } local_data; -void local_data_init() +int local_data_init() { int i; for (i = 0; i < IRMD_MAX_FLOWS; ++i) local_data.in_out[i] = -1; + local_data.flows = flow_set_create(); + if (local_data.flows == NULL) + return -ENFILE; + pthread_rwlock_init(&local_data.lock, NULL); + + return 0; } void local_data_fini() @@ -67,11 +75,24 @@ void local_data_fini() static void * ipcp_local_sdu_loop(void * o) { + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { int fd; - struct rb_entry * e; + int ret; + ssize_t idx; + + ret = flow_event_wait(local_data.flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = flow_select(NULL, NULL); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o) pthread_rwlock_rdlock(&local_data.lock); - e = local_flow_read(fd); + while ((fd = fqueue_next(fq)) >= 0) { + idx = local_flow_read(fd); - fd = local_data.in_out[fd]; + fd = local_data.in_out[fd]; - if (fd != -1) - local_flow_write(fd, e); + if (fd != -1) + local_flow_write(fd, idx); + } pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); - - free(e); } - return (void *) 1; + return (void *) 0; } void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name) if (ipcp_data_add_reg_entry(ipcpi.data, name)) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Failed to add %s to local registry.", name); + LOG_DBG("Failed to add %s to local registry.", name); return -1; } @@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int fd, if (ipcp_get_state() != IPCP_ENROLLED) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Won't register with non-enrolled IPCP."); + LOG_DBG("Won't register with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } pthread_rwlock_wrlock(&local_data.lock); + flow_set_add(local_data.flows, fd); + out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); local_data.in_out[fd] = out_fd; @@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) return 0; pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&local_data.lock); out_fd = local_data.in_out[fd]; if (out_fd < 0) { @@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(local_data.flows, fd); + + pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) @@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd) if (fd < 0) return -EINVAL; + flow_set_del(local_data.flows, fd); + while (flow_dealloc(fd) == -EBUSY) nanosleep(&t, NULL); @@ -289,9 +318,14 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - local_data_init(); - if (ap_init(NULL) < 0) { + LOG_ERR("Failed to init application."); + close_logfile(); + exit(EXIT_FAILURE); + } + + if (local_data_init() < 0) { + LOG_ERR("Failed to init local data."); close_logfile(); exit(EXIT_FAILURE); } @@ -331,10 +365,10 @@ int main(int argc, char * argv[]) pthread_cancel(local_data.sduloop); pthread_join(local_data.sduloop, NULL); - ap_fini(); - local_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); -- cgit v1.2.3