From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 19 Oct 2016 22:25:46 +0200 Subject: lib: Demultiplex the fast path The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API. --- src/ipcpd/normal/fmgr.c | 143 ++++++++++++++++++++++++++---------------------- 1 file changed, 78 insertions(+), 65 deletions(-) (limited to 'src/ipcpd/normal/fmgr.c') diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8c627641..2800dcb2 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include #include @@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o) struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct np1_flow * flow; + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.np1_set, &timeout); - if (fd == -ETIMEDOUT) + int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to retrieve flow."); - continue; - } + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + + flow = fmgr.np1_flows[fd]; + if (flow == NULL) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to retrieve flow."); + continue; + } + + if (frct_i_write_sdu(flow->cep_id, sdb)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to hand SDU to FRCT."); + continue; + } - if (frct_i_write_sdu(flow->cep_id, sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to hand SDU to FRCT."); - continue; - } - pthread_rwlock_unlock(&fmgr.np1_flows_lock); + } } return (void *) 0; @@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o) struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct shm_du_buff * sdb; struct pci * pci; - + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.nm1_set, &timeout); - if (fd == -ETIMEDOUT) - continue; - - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - pci = shm_pci_des(sdb); - if (pci == NULL) { - LOG_ERR("Failed to get PCI."); - ipcp_flow_del(sdb); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - if (pci->dst_addr != ribmgr_address()) { - LOG_DBG("PDU needs to be forwarded."); + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } - if (pci->ttl == 0) { - LOG_DBG("TTL was zero."); + if (pci->dst_addr != ribmgr_address()) { + LOG_DBG("PDU needs to be forwarded."); + + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ ipcp_flow_del(sdb); free(pci); continue; } - if (shm_pci_dec_ttl(sdb)) { - LOG_ERR("Failed to decrease TTL."); + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); ipcp_flow_del(sdb); free(pci); continue; } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_shrink(sdb)) { - LOG_ERR("Failed to shrink PDU."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - if (frct_nm1_post_sdu(pci, sdb)) { - LOG_ERR("Failed to hand PDU to FRCT."); - ipcp_flow_del(sdb); - free(pci); - continue; + if (frct_nm1_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } } } -- cgit v1.2.3