diff options
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/ipcp.c | 6 | ||||
| -rw-r--r-- | src/ipcpd/local/main.c | 68 | ||||
| -rw-r--r-- | src/ipcpd/normal/fmgr.c | 143 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 106 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 84 | 
5 files changed, 264 insertions, 143 deletions
| diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c index a9f80ee7..f9246c7a 100644 --- a/src/ipcpd/ipcp.c +++ b/src/ipcpd/ipcp.c @@ -323,9 +323,9 @@ void * ipcp_main_loop(void * o)                          ret_msg.has_result = true;                          ret_msg.result =                                  ipcpi.ops->ipcp_flow_alloc(fd, -                                                            msg->dst_name, -                                                            msg->src_ae_name, -                                                            msg->qos_cube); +                                                           msg->dst_name, +                                                           msg->src_ae_name, +                                                           msg->qos_cube);                          if (ret_msg.result < 0) {                                  LOG_DBG("Deallocate failed on port_id %d.",                                          msg->port_id); diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index 4e500a8a..68c9ae8c 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -25,7 +25,7 @@  #include <ouroboros/errno.h>  #include <ouroboros/dev.h>  #include <ouroboros/fcntl.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h>  #include <ouroboros/ipcp-dev.h>  #include <ouroboros/local-dev.h>  #define OUROBOROS_PREFIX "ipcpd/local" @@ -39,6 +39,7 @@  #include <sys/wait.h>  #include <fcntl.h> +#define EVENT_WAIT_TIMEOUT 100 /* us */  #define THIS_TYPE IPCP_LOCAL  /* global for trapping signal */ @@ -46,18 +47,25 @@ int irmd_api;  struct {          int                   in_out[IRMD_MAX_FLOWS]; +        flow_set_t *          flows;          pthread_rwlock_t      lock;          pthread_t             sduloop;  } local_data; -void local_data_init() +int local_data_init()  {          int i;          for (i = 0; i < IRMD_MAX_FLOWS; ++i)                  local_data.in_out[i] = -1; +        local_data.flows = flow_set_create(); +        if (local_data.flows == NULL) +                return -ENFILE; +          pthread_rwlock_init(&local_data.lock, NULL); + +        return 0;  }  void local_data_fini() @@ -67,11 +75,24 @@ void local_data_fini()  static void * ipcp_local_sdu_loop(void * o)  { +        struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1; +          while (true) {                  int fd; -                struct rb_entry * e; +                int ret; +                ssize_t idx; + +                ret = flow_event_wait(local_data.flows, fq, &timeout); +                if (ret == -ETIMEDOUT) +                        continue; -                fd = flow_select(NULL, NULL); +                if (ret < 0) { +                        LOG_ERR("Event wait returned error code %d.", -ret); +                        continue; +                }                  pthread_rwlock_rdlock(&ipcpi.state_lock); @@ -82,20 +103,20 @@ static void * ipcp_local_sdu_loop(void * o)                  pthread_rwlock_rdlock(&local_data.lock); -                e = local_flow_read(fd); +                while ((fd = fqueue_next(fq)) >= 0) { +                        idx = local_flow_read(fd); -                fd = local_data.in_out[fd]; +                        fd = local_data.in_out[fd]; -                if (fd != -1) -                        local_flow_write(fd, e); +                        if (fd != -1) +                                local_flow_write(fd, idx); +                }                  pthread_rwlock_unlock(&local_data.lock);                  pthread_rwlock_unlock(&ipcpi.state_lock); - -                free(e);          } -        return (void *) 1; +        return (void *) 0;  }  void ipcp_sig_handler(int sig, siginfo_t * info, void * c) @@ -152,7 +173,7 @@ static int ipcp_local_name_reg(char * name)          if (ipcp_data_add_reg_entry(ipcpi.data, name)) {                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_DBGF("Failed to add %s to local registry.", name); +                LOG_DBG("Failed to add %s to local registry.", name);                  return -1;          } @@ -194,12 +215,14 @@ static int ipcp_local_flow_alloc(int           fd,          if (ipcp_get_state() != IPCP_ENROLLED) {                  pthread_rwlock_unlock(&ipcpi.state_lock); -                LOG_DBGF("Won't register with non-enrolled IPCP."); +                LOG_DBG("Won't register with non-enrolled IPCP.");                  return -1; /* -ENOTENROLLED */          }          pthread_rwlock_wrlock(&local_data.lock); +        flow_set_add(local_data.flows, fd); +          out_fd = ipcp_flow_req_arr(getpid(), dst_name, src_ae_name);          local_data.in_out[fd]  = out_fd; @@ -222,6 +245,7 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)                  return 0;          pthread_rwlock_rdlock(&ipcpi.state_lock); +        pthread_rwlock_rdlock(&local_data.lock);          out_fd = local_data.in_out[fd];          if (out_fd < 0) { @@ -230,6 +254,9 @@ static int ipcp_local_flow_alloc_resp(int fd, int response)                  return -1;          } +        flow_set_add(local_data.flows, fd); + +        pthread_rwlock_unlock(&local_data.lock);          pthread_rwlock_unlock(&ipcpi.state_lock);          if ((ret = ipcp_flow_alloc_reply(out_fd, response)) < 0) @@ -247,6 +274,8 @@ static int ipcp_local_flow_dealloc(int fd)          if (fd < 0)                  return -EINVAL; +        flow_set_del(local_data.flows, fd); +          while (flow_dealloc(fd) == -EBUSY)                  nanosleep(&t, NULL); @@ -289,9 +318,14 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        local_data_init(); -          if (ap_init(NULL) < 0) { +                LOG_ERR("Failed to init application."); +                close_logfile(); +                exit(EXIT_FAILURE); +        } + +        if (local_data_init() < 0) { +                LOG_ERR("Failed to init local data.");                  close_logfile();                  exit(EXIT_FAILURE);          } @@ -331,10 +365,10 @@ int main(int argc, char * argv[])          pthread_cancel(local_data.sduloop);          pthread_join(local_data.sduloop, NULL); -        ap_fini(); -          local_data_fini(); +        ap_fini(); +          close_logfile();          exit(EXIT_SUCCESS); diff --git a/src/ipcpd/normal/fmgr.c b/src/ipcpd/normal/fmgr.c index cb25072e..3da392c5 100644 --- a/src/ipcpd/normal/fmgr.c +++ b/src/ipcpd/normal/fmgr.c @@ -27,7 +27,7 @@  #include <ouroboros/dev.h>  #include <ouroboros/list.h>  #include <ouroboros/ipcp-dev.h> -#include <ouroboros/select.h> +#include <ouroboros/fqueue.h>  #include <ouroboros/errno.h>  #include <stdlib.h> @@ -185,39 +185,47 @@ static void * fmgr_np1_sdu_reader(void * o)          struct shm_du_buff * sdb;          struct timespec timeout = {0, FD_UPDATE_TIMEOUT};          struct np1_flow * flow; +        int fd; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1;          while (true) { -                int fd = flow_select(fmgr.np1_set, &timeout); -                if (fd == -ETIMEDOUT) +                int ret = flow_event_wait(fmgr.np1_set, fq, &timeout); +                if (ret == -ETIMEDOUT)                          continue; -                if (fd < 0) { -                        LOG_ERR("Failed to get active fd."); +                if (ret < 0) { +                        LOG_ERR("Event error: %d.", ret);                          continue;                  } -                if (ipcp_flow_read(fd, &sdb)) { -                        LOG_ERR("Failed to read SDU from fd %d.", fd); -                        continue; -                } +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                LOG_ERR("Failed to read SDU from fd %d.", fd); +                                continue; +                        } -                pthread_rwlock_rdlock(&fmgr.np1_flows_lock); -                flow = fmgr.np1_flows[fd]; -                if (flow == NULL) { -                        pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        ipcp_flow_del(sdb); -                        LOG_ERR("Failed to retrieve flow."); -                        continue; -                } +                        pthread_rwlock_rdlock(&fmgr.np1_flows_lock); + +                        flow = fmgr.np1_flows[fd]; +                        if (flow == NULL) { +                                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                                ipcp_flow_del(sdb); +                                LOG_ERR("Failed to retrieve flow."); +                                continue; +                        } + +                        if (frct_i_write_sdu(flow->cep_id, sdb)) { +                                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                                ipcp_flow_del(sdb); +                                LOG_ERR("Failed to hand SDU to FRCT."); +                                continue; +                        } -                if (frct_i_write_sdu(flow->cep_id, sdb)) {                          pthread_rwlock_unlock(&fmgr.np1_flows_lock); -                        ipcp_flow_del(sdb); -                        LOG_ERR("Failed to hand SDU to FRCT."); -                        continue; -                } -                pthread_rwlock_unlock(&fmgr.np1_flows_lock); +                }          }          return (void *) 0; @@ -228,66 +236,71 @@ void * fmgr_nm1_sdu_reader(void * o)          struct timespec timeout = {0, FD_UPDATE_TIMEOUT};          struct shm_du_buff * sdb;          struct pci * pci; - +        int fd; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1;          while (true) { -                int fd = flow_select(fmgr.nm1_set, &timeout); -                if (fd == -ETIMEDOUT) -                        continue; - -                if (fd < 0) { -                        LOG_ERR("Failed to get active fd."); +                int ret = flow_event_wait(fmgr.nm1_set, fq, &timeout); +                if (ret == -ETIMEDOUT)                          continue; -                } -                if (ipcp_flow_read(fd, &sdb)) { -                        LOG_ERR("Failed to read SDU from fd %d.", fd); +                if (ret < 0) { +                        LOG_ERR("Event error: %d.", ret);                          continue;                  } -                pci = shm_pci_des(sdb); -                if (pci == NULL) { -                        LOG_ERR("Failed to get PCI."); -                        ipcp_flow_del(sdb); -                        continue; -                } +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                LOG_ERR("Failed to read SDU from fd %d.", fd); +                                continue; +                        } -                if (pci->dst_addr != ribmgr_address()) { -                        LOG_DBG("PDU needs to be forwarded."); +                        pci = shm_pci_des(sdb); +                        if (pci == NULL) { +                                LOG_ERR("Failed to get PCI."); +                                ipcp_flow_del(sdb); +                                continue; +                        } -                        if (pci->ttl == 0) { -                                LOG_DBG("TTL was zero."); +                        if (pci->dst_addr != ribmgr_address()) { +                                LOG_DBG("PDU needs to be forwarded."); + +                                if (pci->ttl == 0) { +                                        LOG_DBG("TTL was zero."); +                                        ipcp_flow_del(sdb); +                                        free(pci); +                                        continue; +                                } + +                                if (shm_pci_dec_ttl(sdb)) { +                                        LOG_ERR("Failed to decrease TTL."); +                                        ipcp_flow_del(sdb); +                                        free(pci); +                                        continue; +                                } +                                /* +                                 * FIXME: Dropping for now, since +                                 * we don't have a PFF yet +                                 */                                  ipcp_flow_del(sdb);                                  free(pci);                                  continue;                          } -                        if (shm_pci_dec_ttl(sdb)) { -                                LOG_ERR("Failed to decrease TTL."); +                        if (shm_pci_shrink(sdb)) { +                                LOG_ERR("Failed to shrink PDU.");                                  ipcp_flow_del(sdb);                                  free(pci);                                  continue;                          } -                        /* -                         * FIXME: Dropping for now, since -                         * we don't have a PFF yet -                         */ -                        ipcp_flow_del(sdb); -                        free(pci); -                        continue; -                } - -                if (shm_pci_shrink(sdb)) { -                        LOG_ERR("Failed to shrink PDU."); -                        ipcp_flow_del(sdb); -                        free(pci); -                        continue; -                } -                if (frct_nm1_post_sdu(pci, sdb)) { -                        LOG_ERR("Failed to hand PDU to FRCT."); -                        ipcp_flow_del(sdb); -                        free(pci); -                        continue; +                        if (frct_nm1_post_sdu(pci, sdb)) { +                                LOG_ERR("Failed to hand PDU to FRCT."); +                                ipcp_flow_del(sdb); +                                free(pci); +                                continue; +                        }                  }          } diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index 399d3dc8..db258c8b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -30,6 +30,8 @@  #include <ouroboros/bitmap.h>  #include <ouroboros/dev.h>  #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/fqueue.h>  #define OUROBOROS_PREFIX "ipcpd/shim-eth-llc" @@ -77,6 +79,8 @@ typedef ShimEthLlcMsg shim_eth_llc_msg_t;  #define ETH_FRAME_SIZE (ETH_HEADER_SIZE + LLC_HEADER_SIZE \                          + SHIM_ETH_LLC_MAX_SDU_SIZE) +#define EVENT_WAIT_TIMEOUT 100 /* us */ +  /* global for trapping signal */  int irmd_api; @@ -110,6 +114,7 @@ struct {          uint8_t *          tx_ring;          int                tx_offset;  #endif +        flow_set_t *       np1_flows;          int *              ef_to_fd;          struct ef *        fd_to_ef;          pthread_rwlock_t   flows_lock; @@ -139,6 +144,14 @@ static int eth_llc_data_init()                  return -ENOMEM;          } +        eth_llc_data.np1_flows = flow_set_create(); +        if (eth_llc_data.np1_flows == NULL) { +                bmp_destroy(eth_llc_data.saps); +                free(eth_llc_data.ef_to_fd); +                free(eth_llc_data.fd_to_ef); +                return -ENOMEM; +        } +          for (i = 0; i < MAX_SAPS; ++i)                  eth_llc_data.ef_to_fd[i] = -1; @@ -156,6 +169,7 @@ static int eth_llc_data_init()  void eth_llc_data_fini()  {          bmp_destroy(eth_llc_data.saps); +        flow_set_destroy(eth_llc_data.np1_flows);          free(eth_llc_data.fd_to_ef);          free(eth_llc_data.ef_to_fd);          pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -416,23 +430,17 @@ static int eth_llc_ipcp_flow_dealloc_req(uint8_t ssap, uint8_t * r_addr)                  return 0;          } -        bmp_release(eth_llc_data.saps, eth_llc_data.fd_to_ef[fd].sap); -          pthread_rwlock_unlock(ð_llc_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        flow_dealloc(fd); - -        LOG_DBG("Flow with fd %d deallocated.", fd); +        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);          return 0;  }  static int eth_llc_ipcp_mgmt_frame(uint8_t * buf, size_t len, uint8_t * r_addr)  { -        shim_eth_llc_msg_t * msg = NULL; - -        msg = shim_eth_llc_msg__unpack(NULL, len, buf); +        shim_eth_llc_msg_t * msg = shim_eth_llc_msg__unpack(NULL, len, buf);          if (msg == NULL) {                  LOG_ERR("Failed to unpack.");                  return -1; @@ -590,32 +598,49 @@ static void * eth_llc_ipcp_sdu_reader(void * o)  static void * eth_llc_ipcp_sdu_writer(void * o)  { +        int fd; +        struct shm_du_buff * sdb; +        uint8_t ssap; +        uint8_t dsap; +        uint8_t r_addr[MAC_SIZE]; +        struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1; +          while (true) { -                int fd; -                struct shm_du_buff * sdb; -                uint8_t ssap; -                uint8_t dsap; -                uint8_t r_addr[MAC_SIZE]; - -                fd = ipcp_read_shim(&sdb); -                if (fd < 0) +                int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); +                if (ret == -ETIMEDOUT)                          continue; -                pthread_rwlock_rdlock(&ipcpi.state_lock); -                pthread_rwlock_rdlock(ð_llc_data.flows_lock); +                if (ret < 0) { +                        LOG_ERR("Event wait returned error code %d.", -ret); +                        continue; +                } -                ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); -                dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); -                memcpy(r_addr, eth_llc_data.fd_to_ef[fd].r_addr, MAC_SIZE); +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                LOG_ERR("Bad read from fd %d.", fd); +                                continue; +                        } +                        pthread_rwlock_rdlock(&ipcpi.state_lock); +                        pthread_rwlock_rdlock(ð_llc_data.flows_lock); -                pthread_rwlock_unlock(ð_llc_data.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock); +                        ssap = reverse_bits(eth_llc_data.fd_to_ef[fd].sap); +                        dsap = reverse_bits(eth_llc_data.fd_to_ef[fd].r_sap); +                        memcpy(r_addr, +                               eth_llc_data.fd_to_ef[fd].r_addr, +                               MAC_SIZE); + +                        pthread_rwlock_unlock(ð_llc_data.flows_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock); -                eth_llc_ipcp_send_frame(r_addr, dsap, ssap, -                                        shm_du_buff_head(sdb), -                                        shm_du_buff_tail(sdb) -                                        - shm_du_buff_head(sdb)); -                ipcp_flow_del(sdb); +                        eth_llc_ipcp_send_frame(r_addr, dsap, ssap, +                                                shm_du_buff_head(sdb), +                                                shm_du_buff_tail(sdb) +                                                - shm_du_buff_head(sdb)); +                        ipcp_flow_del(sdb); +                }          }          return (void *) 1; @@ -859,7 +884,7 @@ static int eth_llc_ipcp_flow_alloc(int           fd,          uint8_t ssap = 0;          uint8_t r_addr[MAC_SIZE]; -        LOG_INFO("Allocating flow to %s.", dst_name); +        LOG_DBG("Allocating flow to %s.", dst_name);          if (dst_name == NULL || src_ae_name == NULL)                  return -1; @@ -903,6 +928,8 @@ static int eth_llc_ipcp_flow_alloc(int           fd,                  return -1;          } +        flow_set_add(eth_llc_data.np1_flows, fd); +          LOG_DBG("Pending flow with fd %d on SAP %d.", fd, ssap);          return 0; @@ -941,6 +968,8 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)                  return -1;          } +        flow_set_add(eth_llc_data.np1_flows, fd); +          LOG_DBG("Accepted flow, fd %d, SAP %d.", fd, ssap);          return 0; @@ -948,11 +977,18 @@ static int eth_llc_ipcp_flow_alloc_resp(int fd, int response)  static int eth_llc_ipcp_flow_dealloc(int fd)  { +        struct timespec t = {0, 10000}; +          uint8_t sap;          uint8_t r_sap;          uint8_t addr[MAC_SIZE];          int ret; +        flow_set_del(eth_llc_data.np1_flows, fd); + +        while (flow_dealloc(fd) == -EBUSY) +                nanosleep(&t, NULL); +          pthread_rwlock_rdlock(&ipcpi.state_lock);          pthread_rwlock_wrlock(ð_llc_data.flows_lock); @@ -975,8 +1011,6 @@ static int eth_llc_ipcp_flow_dealloc(int fd)          if (ret < 0)                  LOG_DBG("Could not notify remote."); -        flow_dealloc(fd); -          LOG_DBG("Flow with fd %d deallocated.", fd);          return 0; @@ -1008,10 +1042,12 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        if (eth_llc_data_init() < 0) +        if (ap_init(NULL) < 0) { +                close_logfile();                  exit(EXIT_FAILURE); +        } -        if (ap_init(NULL) < 0) { +        if (eth_llc_data_init() < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } @@ -1054,10 +1090,10 @@ int main(int argc, char * argv[])          pthread_join(eth_llc_data.sdu_writer, NULL);          pthread_join(eth_llc_data.sdu_reader, NULL); -        ap_fini(); -          eth_llc_data_fini(); +        ap_fini(); +          close_logfile();          exit(EXIT_SUCCESS); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index 7c109a8a..050623e4 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -27,6 +27,9 @@  #include <ouroboros/utils.h>  #include <ouroboros/dev.h>  #include <ouroboros/ipcp-dev.h> +#include <ouroboros/fqueue.h> +#include <ouroboros/fcntl.h> +#include <ouroboros/errno.h>  #define OUROBOROS_PREFIX "ipcpd/shim-udp" @@ -75,6 +78,7 @@ struct {          struct sockaddr_in s_saddr;          int                s_fd; +        flow_set_t *       np1_flows;          fd_set             flow_fd_s;          /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */          int                uf_to_fd[FD_SETSIZE]; @@ -90,7 +94,7 @@ struct {          pthread_mutex_t    fd_set_lock;  } udp_data; -static void udp_data_init() +static int udp_data_init()  {          int i; @@ -104,13 +108,21 @@ static void udp_data_init()          FD_ZERO(&udp_data.flow_fd_s); +        udp_data.np1_flows = flow_set_create(); +        if (udp_data.np1_flows == NULL) +                return -ENOMEM; +          pthread_rwlock_init(&udp_data.flows_lock, NULL);          pthread_cond_init(&udp_data.fd_set_cond, NULL);          pthread_mutex_init(&udp_data.fd_set_lock, NULL); + +        return 0;  }  static void udp_data_fini()  { +        flow_set_destroy(udp_data.np1_flows); +          pthread_rwlock_destroy(&udp_data.flows_lock);          pthread_mutex_destroy(&udp_data.fd_set_lock);          pthread_cond_destroy(&udp_data.fd_set_cond); @@ -387,7 +399,7 @@ static int ipcp_udp_flow_dealloc_req(int udp_port)          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); -        flow_dealloc(fd); +        flow_cntl(fd, FLOW_F_SETFL, FLOW_O_WRONLY);          close(skfd); @@ -505,30 +517,45 @@ static void * ipcp_udp_sdu_reader()  static void * ipcp_udp_sdu_loop(void * o)  { +        int fd; +        struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; +        struct shm_du_buff * sdb; +        fqueue_t * fq = fqueue_create(); +        if (fq == NULL) +                return (void *) 1;          while (true) { -                int fd; -                struct shm_du_buff * sdb; +                int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); +                if (ret == -ETIMEDOUT) +                        continue; -                fd = ipcp_read_shim(&sdb); -                if (fd < 0) +                if (ret < 0) { +                        LOG_ERR("Event wait returned error code %d.", -ret);                          continue; +                } -                pthread_rwlock_rdlock(&ipcpi.state_lock); -                pthread_rwlock_rdlock(&udp_data.flows_lock); +                while ((fd = fqueue_next(fq)) >= 0) { +                        if (ipcp_flow_read(fd, &sdb)) { +                                LOG_ERR("Bad read from fd %d.", fd); +                                continue; +                        } -                fd = udp_data.fd_to_uf[fd].skfd; +                        pthread_rwlock_rdlock(&ipcpi.state_lock); +                        pthread_rwlock_rdlock(&udp_data.flows_lock); -                pthread_rwlock_unlock(&udp_data.flows_lock); -                pthread_rwlock_unlock(&ipcpi.state_lock); +                        fd = udp_data.fd_to_uf[fd].skfd; + +                        pthread_rwlock_unlock(&udp_data.flows_lock); +                        pthread_rwlock_unlock(&ipcpi.state_lock); -                if (send(fd, -                         shm_du_buff_head(sdb), -                         shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), -                         0) < 0) -                        LOG_ERR("Failed to send SDU."); +                        if (send(fd, +                                 shm_du_buff_head(sdb), +                                 shm_du_buff_tail(sdb) - shm_du_buff_head(sdb), +                                 0) < 0) +                                LOG_ERR("Failed to send SDU."); -                ipcp_flow_del(sdb); +                        ipcp_flow_del(sdb); +                }          }          return (void *) 1; @@ -993,6 +1020,8 @@ static int ipcp_udp_flow_alloc(int           fd,          udp_data.fd_to_uf[fd].skfd = skfd;          udp_data.uf_to_fd[skfd]    = fd; +        flow_set_add(udp_data.np1_flows, fd); +          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1049,6 +1078,8 @@ static int ipcp_udp_flow_alloc_resp(int fd, int response)          set_fd(skfd); +        flow_set_add(udp_data.np1_flows, fd); +          pthread_rwlock_unlock(&udp_data.flows_lock);          pthread_rwlock_unlock(&ipcpi.state_lock); @@ -1075,9 +1106,15 @@ static int ipcp_udp_flow_dealloc(int fd)  {          int skfd = -1;          int remote_udp = -1; +        struct timespec t = {0, 10000};          struct sockaddr_in    r_saddr;          socklen_t             r_saddr_len = sizeof(r_saddr); +        flow_set_del(udp_data.np1_flows, fd); + +        while (flow_dealloc(fd) == -EBUSY) +                nanosleep(&t, NULL); +          pthread_rwlock_rdlock(&ipcpi.state_lock);          pthread_rwlock_wrlock(&udp_data.flows_lock); @@ -1117,8 +1154,6 @@ static int ipcp_udp_flow_dealloc(int fd)          close(skfd); -        flow_dealloc(fd); -          LOG_DBG("Flow with fd %d deallocated.", fd);          return 0; @@ -1149,13 +1184,16 @@ int main(int argc, char * argv[])                  exit(EXIT_FAILURE);          } -        udp_data_init(); -          if (ap_init(NULL) < 0) {                  close_logfile();                  exit(EXIT_FAILURE);          } +        if (udp_data_init() < 0) { +                close_logfile(); +                exit(EXIT_FAILURE); +        } +          /* store the process id of the irmd */          irmd_api = atoi(argv[1]); @@ -1196,10 +1234,10 @@ int main(int argc, char * argv[])          pthread_join(udp_data.handler, NULL);          pthread_join(udp_data.sdu_reader, NULL); -        ap_fini(); -          udp_data_fini(); +        ap_fini(); +          close_logfile();          exit(EXIT_SUCCESS); | 
