From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 19 Oct 2016 22:25:46 +0200 Subject: lib: Demultiplex the fast path The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API. --- src/ipcpd/shim-eth-llc/main.c | 106 ++++++++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 35 deletions(-) (limited to 'src/ipcpd/shim-eth-llc') diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 399d3dc8..db258c8b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -30,6 +30,8 @@ #include #include #include +#include +#include #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc" @@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t; #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \ + SHIM_ETH_LLC_MAX_SDU_SIZE) +#define EVENT_WAIT_TIMEOUT 100 /* us */ + /* global for trapping signal */ int irmd_api; @@ -110,6 +114,7 @@ struct { uint8_t * tx_ring; int tx_offset; #endif + flow_set_t * np1_flows; int * ef_to_fd; struct ef * fd_to_ef; pthread_rwlock_t flows_lock; @@ -139,6 +144,14 @@ static int eth_llc_data_init() return -ENOMEM; } + eth_llc_data.np1_flows = flow_set_create(); + if (eth_llc_data.np1_flows == NULL) { + bmp_destroy(eth_llc_data.saps); + free(eth_llc_data.ef_to_fd); + free(eth_llc_data.fd_to_ef); + return -ENOMEM; + } + for (i = 0; i < MAX_SAPS; ++i) eth_llc_data.ef_to_fd[i] = -1; @@ -156,6 +169,7 @@ static int eth_llc_data_init() void eth_llc_data_fini() { bmp_destroy(eth_llc_data.saps); + flow_set_destroy(eth_llc_data.np1_flows); free(eth_llc_data.fd_to_ef); free(eth_llc_data.ef_to_fd); pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr) return 0; } - bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); - pthread_rwlock_unlock(ð_llc_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); - - LOG_DBG("Flow with fd %d deallocated.", fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); return 0; } static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr) { - shim_eth_llc_msg_t * msg = NULL; - - msg = shim_eth_llc_msg__unpack(NULL, len, buf); + shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf); if (msg == NULL) { LOG_ERR("Failed to unpack."); return -1; @@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o) static void * eth_llc_ipcp_sdu_writer(void * o) { + int fd; + struct shm_du_buff * sdb; + uint8_t ssap; + uint8_t dsap; + uint8_t r_addr[MAC_SIZE]; + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { - int fd; - struct shm_du_buff * sdb; - uint8_t ssap; - uint8_t dsap; - uint8_t r_addr[MAC_SIZE]; - - fd = ipcp_read_shim(&sdb); - if (fd < 0) + int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(ð_llc_data.flows_lock); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } - ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); - dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); - memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); + dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); + memcpy(r_addr, + eth_llc_data.fd_to_ef[fd].r_addr, + MAC_SIZE); + + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - eth_llc_ipcp_send_frame(r_addr, dsap, ssap, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - - shm_du_buff_head(sdb)); - ipcp_flow_del(sdb); + eth_llc_ipcp_send_frame(r_addr, dsap, ssap, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) + - shm_du_buff_head(sdb)); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int fd, uint8_t ssap = 0; uint8_t r_addr[MAC_SIZE]; - LOG_INFO("Allocating flow to %s.", dst_name); + LOG_DBG("Allocating flow to %s.", dst_name); if (dst_name == NULL || src_ae_name == NULL) return -1; @@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int fd, return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap); return 0; @@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap); return 0; @@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) static int eth_llc_ipcp_flow_dealloc(int fd) { + struct timespec t = {0, 10000}; + uint8_t sap; uint8_t r_sap; uint8_t addr[MAC_SIZE]; int ret; + flow_set_del(eth_llc_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(ð_llc_data.flows_lock); @@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd) if (ret < 0) LOG_DBG("Could not notify remote."); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1008,10 +1042,12 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - if (eth_llc_data_init() < 0) + if (ap_init(NULL) < 0) { + close_logfile(); exit(EXIT_FAILURE); + } - if (ap_init(NULL) < 0) { + if (eth_llc_data_init() < 0) { close_logfile(); exit(EXIT_FAILURE); } @@ -1054,10 +1090,10 @@ int main(int argc, char * argv[]) pthread_join(eth_llc_data.sdu_writer, NULL); pthread_join(eth_llc_data.sdu_reader, NULL); - ap_fini(); - eth_llc_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); -- cgit v1.2.3