From f516b51169020ea1957010fbd1005d746f01b1d9 Mon Sep 17 00:00:00 2001 From: dimitri staessens Date: Wed, 19 Oct 2016 22:25:46 +0200 Subject: lib: Demultiplex the fast path The fast path will now use an incoming ring buffer per flow per process. This necessitated the development of a new method for the asynchronous io call, which is now based on an event queue system for scalability (fqueue). The ipcpd's and tools have been updated to this API. --- src/ipcpd/ipcp.c | 6 +- src/ipcpd/local/main.c | 68 +++++++++++++++----- src/ipcpd/normal/fmgr.c | 143 +++++++++++++++++++++++------------------- src/ipcpd/shim-eth-llc/main.c | 106 ++++++++++++++++++++----------- src/ipcpd/shim-udp/main.c | 84 ++++++++++++++++++------- 5 files changed, 264 insertions(+), 143 deletions(-) (limited to 'src/ipcpd') diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a9f80ee7..f9246c7a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -323,9 +323,9 @@ void * ipcp_main_loop(void * o) ret_msg.has_result = true; ret_msg.result = ipcpi.ops->ipcp_flow_alloc(fd, - msg->dst_name, - msg->src_ae_name, - msg->qos_cube); + msg->dst_name, + msg->src_ae_name, + msg->qos_cube); if (ret_msg.result < 0) { LOG_DBG("Deallocate failed on port_id %d.", msg->port_id); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 4e500a8a..68c9ae8c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include #define OUROBOROS_PREFIX "ipcpd/local" @@ -39,6 +39,7 @@ #include #include +#define EVENT_WAIT_TIMEOUT 100 /* us */ #define THIS_TYPE IPCP_LOCAL /* global for trapping signal */ @@ -46,18 +47,25 @@ int irmd_api; struct { int in_out[IRMD_MAX_FLOWS]; + flow_set_t * flows; pthread_rwlock_t lock; pthread_t sduloop; } local_data; -void local_data_init() +int local_data_init() { int i; for (i = 0; i < IRMD_MAX_FLOWS; ++i) local_data.in_out[i] = -1; + local_data.flows = flow_set_create(); + if (local_data.flows == NULL) + return -ENFILE; + pthread_rwlock_init(&local_data.lock, NULL); + + return 0; } void local_data_fini() @@ -67,11 +75,24 @@ void local_data_fini() static void * ipcp_local_sdu_loop(void * o) { + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { int fd; - struct rb_entry * e; + int ret; + ssize_t idx; + + ret = flow_event_wait(local_data.flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = flow_select(NULL, NULL); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o) pthread_rwlock_rdlock(&local_data.lock); - e = local_flow_read(fd); + while ((fd = fqueue_next(fq)) >= 0) { + idx = local_flow_read(fd); - fd = local_data.in_out[fd]; + fd = local_data.in_out[fd]; - if (fd != -1) - local_flow_write(fd, e); + if (fd != -1) + local_flow_write(fd, idx); + } pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); - - free(e); } - return (void *) 1; + return (void *) 0; } void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name) if (ipcp_data_add_reg_entry(ipcpi.data, name)) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Failed to add %s to local registry.", name); + LOG_DBG("Failed to add %s to local registry.", name); return -1; } @@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int fd, if (ipcp_get_state() != IPCP_ENROLLED) { pthread_rwlock_unlock(&ipcpi.state_lock); - LOG_DBGF("Won't register with non-enrolled IPCP."); + LOG_DBG("Won't register with non-enrolled IPCP."); return -1; /* -ENOTENROLLED */ } pthread_rwlock_wrlock(&local_data.lock); + flow_set_add(local_data.flows, fd); + out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name); local_data.in_out[fd] = out_fd; @@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) return 0; pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&local_data.lock); out_fd = local_data.in_out[fd]; if (out_fd < 0) { @@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(local_data.flows, fd); + + pthread_rwlock_unlock(&local_data.lock); pthread_rwlock_unlock(&ipcpi.state_lock); if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) @@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd) if (fd < 0) return -EINVAL; + flow_set_del(local_data.flows, fd); + while (flow_dealloc(fd) == -EBUSY) nanosleep(&t, NULL); @@ -289,9 +318,14 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - local_data_init(); - if (ap_init(NULL) < 0) { + LOG_ERR("Failed to init application."); + close_logfile(); + exit(EXIT_FAILURE); + } + + if (local_data_init() < 0) { + LOG_ERR("Failed to init local data."); close_logfile(); exit(EXIT_FAILURE); } @@ -331,10 +365,10 @@ int main(int argc, char * argv[]) pthread_cancel(local_data.sduloop); pthread_join(local_data.sduloop, NULL); - ap_fini(); - local_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index 8c627641..2800dcb2 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,7 +27,7 @@ #include #include #include -#include +#include #include #include @@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o) struct shm_du_buff * sdb; struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct np1_flow * flow; + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.np1_set, &timeout); - if (fd == -ETIMEDOUT) + int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - pthread_rwlock_rdlock(&fmgr.np1_flows_lock); - flow = fmgr.np1_flows[fd]; - if (flow == NULL) { - pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to retrieve flow."); - continue; - } + pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + + flow = fmgr.np1_flows[fd]; + if (flow == NULL) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to retrieve flow."); + continue; + } + + if (frct_i_write_sdu(flow->cep_id, sdb)) { + pthread_rwlock_unlock(&fmgr.np1_flows_lock); + ipcp_flow_del(sdb); + LOG_ERR("Failed to hand SDU to FRCT."); + continue; + } - if (frct_i_write_sdu(flow->cep_id, sdb)) { pthread_rwlock_unlock(&fmgr.np1_flows_lock); - ipcp_flow_del(sdb); - LOG_ERR("Failed to hand SDU to FRCT."); - continue; - } - pthread_rwlock_unlock(&fmgr.np1_flows_lock); + } } return (void *) 0; @@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o) struct timespec timeout = {0, FD_UPDATE_TIMEOUT}; struct shm_du_buff * sdb; struct pci * pci; - + int fd; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd = flow_select(fmgr.nm1_set, &timeout); - if (fd == -ETIMEDOUT) - continue; - - if (fd < 0) { - LOG_ERR("Failed to get active fd."); + int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - } - if (ipcp_flow_read(fd, &sdb)) { - LOG_ERR("Failed to read SDU from fd %d.", fd); + if (ret < 0) { + LOG_ERR("Event error: %d.", ret); continue; } - pci = shm_pci_des(sdb); - if (pci == NULL) { - LOG_ERR("Failed to get PCI."); - ipcp_flow_del(sdb); - continue; - } + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Failed to read SDU from fd %d.", fd); + continue; + } - if (pci->dst_addr != ribmgr_address()) { - LOG_DBG("PDU needs to be forwarded."); + pci = shm_pci_des(sdb); + if (pci == NULL) { + LOG_ERR("Failed to get PCI."); + ipcp_flow_del(sdb); + continue; + } - if (pci->ttl == 0) { - LOG_DBG("TTL was zero."); + if (pci->dst_addr != ribmgr_address()) { + LOG_DBG("PDU needs to be forwarded."); + + if (pci->ttl == 0) { + LOG_DBG("TTL was zero."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + + if (shm_pci_dec_ttl(sdb)) { + LOG_ERR("Failed to decrease TTL."); + ipcp_flow_del(sdb); + free(pci); + continue; + } + /* + * FIXME: Dropping for now, since + * we don't have a PFF yet + */ ipcp_flow_del(sdb); free(pci); continue; } - if (shm_pci_dec_ttl(sdb)) { - LOG_ERR("Failed to decrease TTL."); + if (shm_pci_shrink(sdb)) { + LOG_ERR("Failed to shrink PDU."); ipcp_flow_del(sdb); free(pci); continue; } - /* - * FIXME: Dropping for now, since - * we don't have a PFF yet - */ - ipcp_flow_del(sdb); - free(pci); - continue; - } - - if (shm_pci_shrink(sdb)) { - LOG_ERR("Failed to shrink PDU."); - ipcp_flow_del(sdb); - free(pci); - continue; - } - if (frct_nm1_post_sdu(pci, sdb)) { - LOG_ERR("Failed to hand PDU to FRCT."); - ipcp_flow_del(sdb); - free(pci); - continue; + if (frct_nm1_post_sdu(pci, sdb)) { + LOG_ERR("Failed to hand PDU to FRCT."); + ipcp_flow_del(sdb); + free(pci); + continue; + } } } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 399d3dc8..db258c8b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -30,6 +30,8 @@ #include #include #include +#include +#include #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc" @@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t; #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \ + SHIM_ETH_LLC_MAX_SDU_SIZE) +#define EVENT_WAIT_TIMEOUT 100 /* us */ + /* global for trapping signal */ int irmd_api; @@ -110,6 +114,7 @@ struct { uint8_t * tx_ring; int tx_offset; #endif + flow_set_t * np1_flows; int * ef_to_fd; struct ef * fd_to_ef; pthread_rwlock_t flows_lock; @@ -139,6 +144,14 @@ static int eth_llc_data_init() return -ENOMEM; } + eth_llc_data.np1_flows = flow_set_create(); + if (eth_llc_data.np1_flows == NULL) { + bmp_destroy(eth_llc_data.saps); + free(eth_llc_data.ef_to_fd); + free(eth_llc_data.fd_to_ef); + return -ENOMEM; + } + for (i = 0; i < MAX_SAPS; ++i) eth_llc_data.ef_to_fd[i] = -1; @@ -156,6 +169,7 @@ static int eth_llc_data_init() void eth_llc_data_fini() { bmp_destroy(eth_llc_data.saps); + flow_set_destroy(eth_llc_data.np1_flows); free(eth_llc_data.fd_to_ef); free(eth_llc_data.ef_to_fd); pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr) return 0; } - bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); - pthread_rwlock_unlock(ð_llc_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); - - LOG_DBG("Flow with fd %d deallocated.", fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); return 0; } static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr) { - shim_eth_llc_msg_t * msg = NULL; - - msg = shim_eth_llc_msg__unpack(NULL, len, buf); + shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf); if (msg == NULL) { LOG_ERR("Failed to unpack."); return -1; @@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o) static void * eth_llc_ipcp_sdu_writer(void * o) { + int fd; + struct shm_du_buff * sdb; + uint8_t ssap; + uint8_t dsap; + uint8_t r_addr[MAC_SIZE]; + struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; + while (true) { - int fd; - struct shm_du_buff * sdb; - uint8_t ssap; - uint8_t dsap; - uint8_t r_addr[MAC_SIZE]; - - fd = ipcp_read_shim(&sdb); - if (fd < 0) + int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) continue; - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(ð_llc_data.flows_lock); + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); + continue; + } - ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); - dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); - memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(ð_llc_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); + dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); + memcpy(r_addr, + eth_llc_data.fd_to_ef[fd].r_addr, + MAC_SIZE); + + pthread_rwlock_unlock(ð_llc_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - eth_llc_ipcp_send_frame(r_addr, dsap, ssap, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - - shm_du_buff_head(sdb)); - ipcp_flow_del(sdb); + eth_llc_ipcp_send_frame(r_addr, dsap, ssap, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) + - shm_du_buff_head(sdb)); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int fd, uint8_t ssap = 0; uint8_t r_addr[MAC_SIZE]; - LOG_INFO("Allocating flow to %s.", dst_name); + LOG_DBG("Allocating flow to %s.", dst_name); if (dst_name == NULL || src_ae_name == NULL) return -1; @@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int fd, return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap); return 0; @@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) return -1; } + flow_set_add(eth_llc_data.np1_flows, fd); + LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap); return 0; @@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response) static int eth_llc_ipcp_flow_dealloc(int fd) { + struct timespec t = {0, 10000}; + uint8_t sap; uint8_t r_sap; uint8_t addr[MAC_SIZE]; int ret; + flow_set_del(eth_llc_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(ð_llc_data.flows_lock); @@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd) if (ret < 0) LOG_DBG("Could not notify remote."); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1008,10 +1042,12 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - if (eth_llc_data_init() < 0) + if (ap_init(NULL) < 0) { + close_logfile(); exit(EXIT_FAILURE); + } - if (ap_init(NULL) < 0) { + if (eth_llc_data_init() < 0) { close_logfile(); exit(EXIT_FAILURE); } @@ -1054,10 +1090,10 @@ int main(int argc, char * argv[]) pthread_join(eth_llc_data.sdu_writer, NULL); pthread_join(eth_llc_data.sdu_reader, NULL); - ap_fini(); - eth_llc_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 7c109a8a..050623e4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -27,6 +27,9 @@ #include #include #include +#include +#include +#include #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -75,6 +78,7 @@ struct { struct sockaddr_in s_saddr; int s_fd; + flow_set_t * np1_flows; fd_set flow_fd_s; /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ int uf_to_fd[FD_SETSIZE]; @@ -90,7 +94,7 @@ struct { pthread_mutex_t fd_set_lock; } udp_data; -static void udp_data_init() +static int udp_data_init() { int i; @@ -104,13 +108,21 @@ static void udp_data_init() FD_ZERO(&udp_data.flow_fd_s); + udp_data.np1_flows = flow_set_create(); + if (udp_data.np1_flows == NULL) + return -ENOMEM; + pthread_rwlock_init(&udp_data.flows_lock, NULL); pthread_cond_init(&udp_data.fd_set_cond, NULL); pthread_mutex_init(&udp_data.fd_set_lock, NULL); + + return 0; } static void udp_data_fini() { + flow_set_destroy(udp_data.np1_flows); + pthread_rwlock_destroy(&udp_data.flows_lock); pthread_mutex_destroy(&udp_data.fd_set_lock); pthread_cond_destroy(&udp_data.fd_set_cond); @@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port) pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); - flow_dealloc(fd); + flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY); close(skfd); @@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader() static void * ipcp_udp_sdu_loop(void * o) { + int fd; + struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; + struct shm_du_buff * sdb; + fqueue_t * fq = fqueue_create(); + if (fq == NULL) + return (void *) 1; while (true) { - int fd; - struct shm_du_buff * sdb; + int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); + if (ret == -ETIMEDOUT) + continue; - fd = ipcp_read_shim(&sdb); - if (fd < 0) + if (ret < 0) { + LOG_ERR("Event wait returned error code %d.", -ret); continue; + } - pthread_rwlock_rdlock(&ipcpi.state_lock); - pthread_rwlock_rdlock(&udp_data.flows_lock); + while ((fd = fqueue_next(fq)) >= 0) { + if (ipcp_flow_read(fd, &sdb)) { + LOG_ERR("Bad read from fd %d.", fd); + continue; + } - fd = udp_data.fd_to_uf[fd].skfd; + pthread_rwlock_rdlock(&ipcpi.state_lock); + pthread_rwlock_rdlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&udp_data.flows_lock); - pthread_rwlock_unlock(&ipcpi.state_lock); + fd = udp_data.fd_to_uf[fd].skfd; + + pthread_rwlock_unlock(&udp_data.flows_lock); + pthread_rwlock_unlock(&ipcpi.state_lock); - if (send(fd, - shm_du_buff_head(sdb), - shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), - 0) < 0) - LOG_ERR("Failed to send SDU."); + if (send(fd, + shm_du_buff_head(sdb), + shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), + 0) < 0) + LOG_ERR("Failed to send SDU."); - ipcp_flow_del(sdb); + ipcp_flow_del(sdb); + } } return (void *) 1; @@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int fd, udp_data.fd_to_uf[fd].skfd = skfd; udp_data.uf_to_fd[skfd] = fd; + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response) set_fd(skfd); + flow_set_add(udp_data.np1_flows, fd); + pthread_rwlock_unlock(&udp_data.flows_lock); pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd) { int skfd = -1; int remote_udp = -1; + struct timespec t = {0, 10000}; struct sockaddr_in r_saddr; socklen_t r_saddr_len = sizeof(r_saddr); + flow_set_del(udp_data.np1_flows, fd); + + while (flow_dealloc(fd) == -EBUSY) + nanosleep(&t, NULL); + pthread_rwlock_rdlock(&ipcpi.state_lock); pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd) close(skfd); - flow_dealloc(fd); - LOG_DBG("Flow with fd %d deallocated.", fd); return 0; @@ -1149,13 +1184,16 @@ int main(int argc, char * argv[]) exit(EXIT_FAILURE); } - udp_data_init(); - if (ap_init(NULL) < 0) { close_logfile(); exit(EXIT_FAILURE); } + if (udp_data_init() < 0) { + close_logfile(); + exit(EXIT_FAILURE); + } + /* store the process id of the irmd */ irmd_api = atoi(argv[1]); @@ -1196,10 +1234,10 @@ int main(int argc, char * argv[]) pthread_join(udp_data.handler, NULL); pthread_join(udp_data.sdu_reader, NULL); - ap_fini(); - udp_data_fini(); + ap_fini(); + close_logfile(); exit(EXIT_SUCCESS); -- cgit v1.2.3