diff options
| -rw-r--r-- | src/ipcpd/local/main.c | 25 | ||||
| -rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 33 | ||||
| -rw-r--r-- | src/ipcpd/shim-udp/main.c | 33 | ||||
| -rw-r--r-- | src/irmd/main.c | 11 | ||||
| -rw-r--r-- | src/tools/oping/oping.c | 2 | ||||
| -rw-r--r-- | src/tools/oping/oping_client.c | 18 | ||||
| -rw-r--r-- | src/tools/oping/oping_server.c | 23 | 
7 files changed, 84 insertions, 61 deletions
| diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index b8783d9d..a8d5c273 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -50,6 +50,7 @@ int irmd_api;  struct {          int                   in_out[IRMD_MAX_FLOWS];          flow_set_t *          flows; +        fqueue_t *            fq;          pthread_rwlock_t      lock;          pthread_t             sduloop; @@ -65,6 +66,12 @@ static int local_data_init(void)          if (local_data.flows == NULL)                  return -ENFILE; +        local_data.fq = fqueue_create(); +        if (local_data.fq == NULL) { +                flow_set_destroy(local_data.flows); +                return -ENOMEM; +        } +          pthread_rwlock_init(&local_data.lock, NULL);          return 0; @@ -72,40 +79,35 @@ static int local_data_init(void)  static void local_data_fini(void)  { +        flow_set_destroy(local_data.flows); +        fqueue_destroy(local_data.fq);          pthread_rwlock_destroy(&local_data.lock);  }  static void * ipcp_local_sdu_loop(void * o)  {          struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1;          (void) o;          while (true) {                  int fd; -                int ret;                  ssize_t idx; -                ret = flow_event_wait(local_data.flows, fq, &timeout); -                if (ret == -ETIMEDOUT) +                if (flow_event_wait(local_data.flows, local_data.fq, &timeout) +                    == -ETIMEDOUT)                          continue; -                assert(!ret); -                  pthread_rwlock_rdlock(&ipcpi.state_lock);                  if (ipcp_get_state() != IPCP_ENROLLED) {                          pthread_rwlock_unlock(&ipcpi.state_lock); -                        fqueue_destroy(fq);                          return (void *) 1; /* -ENOTENROLLED */                  }                  pthread_rwlock_rdlock(&local_data.lock); -                while ((fd = fqueue_next(fq)) >= 0) { +                while ((fd = fqueue_next(local_data.fq)) >= 0) {                          idx = local_flow_read(fd);                          assert((size_t) idx < (SHM_BUFFER_SIZE)); @@ -152,7 +154,8 @@ static int ipcp_local_bootstrap(struct dif_config * conf)          assert(conf);          assert(conf->type == THIS_TYPE); -        (void) conf; +        /* this IPCP doesn't need to maintain its dif_name */ +        free(conf->dif_name);          pthread_rwlock_wrlock(&ipcpi.state_lock); diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index d4ea8eba..f6cded2b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -113,6 +113,7 @@ struct {          int                tx_offset;  #endif          flow_set_t *       np1_flows; +        fqueue_t *         fq;          int *              ef_to_fd;          struct ef *        fd_to_ef;          pthread_rwlock_t   flows_lock; @@ -150,6 +151,15 @@ static int eth_llc_data_init(void)                  return -ENOMEM;          } +        eth_llc_data.fq = fqueue_create(); +        if (eth_llc_data.fq == NULL) { +                flow_set_destroy(eth_llc_data.np1_flows); +                bmp_destroy(eth_llc_data.saps); +                free(eth_llc_data.ef_to_fd); +                free(eth_llc_data.fd_to_ef); +                return -ENOMEM; +        } +          for (i = 0; i < MAX_SAPS; ++i)                  eth_llc_data.ef_to_fd[i] = -1; @@ -168,6 +178,7 @@ void eth_llc_data_fini(void)  {          bmp_destroy(eth_llc_data.saps);          flow_set_destroy(eth_llc_data.np1_flows); +        fqueue_destroy(eth_llc_data.fq);          free(eth_llc_data.fd_to_ef);          free(eth_llc_data.ef_to_fd);          pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -648,20 +659,16 @@ static void * eth_llc_ipcp_sdu_writer(void * o)          uint8_t dsap;          uint8_t r_addr[MAC_SIZE];          struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1;          (void) o;          while (true) { -                int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); -                if (ret == -ETIMEDOUT) +                if (flow_event_wait(eth_llc_data.np1_flows, +                                    eth_llc_data.fq, +                                    &timeout) == -ETIMEDOUT)                          continue; -                assert(!ret); - -                while ((fd = fqueue_next(fq)) >= 0) { +                while ((fd = fqueue_next(eth_llc_data.fq)) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  LOG_ERR("Bad read from fd %d.", fd);                                  continue; @@ -729,13 +736,11 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf)          struct tpacket_req req;  #endif -        if (conf == NULL) -                return -1; /* -EINVAL */ +        assert(conf); +        assert(conf->type == THIS_TYPE); -        if (conf->type != THIS_TYPE) { -                LOG_ERR("Config doesn't match IPCP type."); -                return -1; -        } +        /* this IPCP doesn't need to maintain its dif_name */ +        free(conf->dif_name);          if (conf->if_name == NULL) {                  LOG_ERR("Interface name is NULL."); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index f779713c..e06787ce 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -78,6 +78,7 @@ struct {          int                s_fd;          flow_set_t *       np1_flows; +        fqueue_t *         fq;          fd_set             flow_fd_s;          /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */          int                uf_to_fd[FD_SETSIZE]; @@ -111,6 +112,12 @@ static int udp_data_init(void)          if (udp_data.np1_flows == NULL)                  return -ENOMEM; +        udp_data.fq = fqueue_create(); +        if (udp_data.fq == NULL) { +                flow_set_destroy(udp_data.np1_flows); +                return -ENOMEM; +        } +          pthread_rwlock_init(&udp_data.flows_lock, NULL);          pthread_cond_init(&udp_data.fd_set_cond, NULL);          pthread_mutex_init(&udp_data.fd_set_lock, NULL); @@ -121,6 +128,7 @@ static int udp_data_init(void)  static void udp_data_fini(void)  {          flow_set_destroy(udp_data.np1_flows); +        fqueue_destroy(udp_data.fq);          pthread_rwlock_destroy(&udp_data.flows_lock);          pthread_mutex_destroy(&udp_data.fd_set_lock); @@ -522,23 +530,16 @@ static void * ipcp_udp_sdu_loop(void * o)          int fd;          struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000};          struct shm_du_buff * sdb; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1;          (void) o;          while (true) { -                int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); -                if (ret == -ETIMEDOUT) -                        continue; - -                if (ret < 0) { -                        LOG_ERR("Event wait returned error code %d.", -ret); +                if (flow_event_wait(udp_data.np1_flows, +                                    udp_data.fq, +                                    &timeout)  == -ETIMEDOUT)                          continue; -                } -                while ((fd = fqueue_next(fq)) >= 0) { +                while ((fd = fqueue_next(udp_data.fq)) >= 0) {                          if (ipcp_flow_read(fd, &sdb)) {                                  LOG_ERR("Bad read from fd %d.", fd);                                  continue; @@ -593,13 +594,11 @@ static int ipcp_udp_bootstrap(struct dif_config * conf)          int  enable = 1;          int  fd = -1; -        if (conf == NULL) -                return -1; /* -EINVAL */ +        assert(conf); +        assert(conf->type == THIS_TYPE); -        if (conf->type != THIS_TYPE) { -                LOG_ERR("Config doesn't match IPCP type."); -                return -1; -        } +        /* this IPCP doesn't need to maintain its dif_name */ +        free(conf->dif_name);          if (inet_ntop(AF_INET,                        &conf->ip_addr, diff --git a/src/irmd/main.c b/src/irmd/main.c index aac47adb..740472b9 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1241,8 +1241,12 @@ static int flow_alloc_res(int port_id)          pthread_rwlock_unlock(&irmd->flows_lock);          pthread_rwlock_unlock(&irmd->state_lock); -        if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) +        if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) { +                LOG_INFO("Flow on port_id %d allocated.", port_id);                  return 0; +        } + +        LOG_INFO("Pending flow on port_id %d torn down.", port_id);          return -1;  } @@ -1344,6 +1348,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,          struct pid_el * c_api;          pid_t h_api = -1; +        int port_id = -1;          LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.",                   api, dst_name, ae_name); @@ -1462,7 +1467,7 @@ static struct irm_flow * flow_req_arr(pid_t  api,          pthread_rwlock_unlock(&irmd->reg_lock);          pthread_rwlock_wrlock(&irmd->flows_lock); -        f->port_id = bmp_allocate(irmd->port_ids); +        port_id = f->port_id = bmp_allocate(irmd->port_ids);          if (!bmp_is_id_valid(irmd->port_ids, f->port_id)) {                  pthread_rwlock_unlock(&irmd->flows_lock);                  pthread_rwlock_unlock(&irmd->state_lock); @@ -1527,6 +1532,8 @@ static struct irm_flow * flow_req_arr(pid_t  api,          pthread_mutex_unlock(&re->state_lock); +        LOG_INFO("Flow on port_id %d allocated.", port_id); +          return f;  } diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index b476b33a..801f79b5 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -54,6 +54,7 @@ struct c {          double rtt_m2;          flow_set_t * flows; +        fqueue_t *   fq;          /* needs locking */          struct timespec * times; @@ -66,6 +67,7 @@ struct c {  struct s {          struct timespec   times[OPING_MAX_FLOWS];          flow_set_t *      flows; +        fqueue_t *        fq;          pthread_mutex_t   lock;          pthread_t cleaner_pt; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 9f49a1df..85cb2880 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -64,16 +64,14 @@ void * reader(void * o)          int msg_len = 0;          double ms = 0;          double d = 0; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1;          (void) o;          /* FIXME: use flow timeout option once we have it */          while (client.rcvd != client.count -               && flow_event_wait(client.flows, fq, &timeout) != -ETIMEDOUT) { -                while ((fd = fqueue_next(fq)) >= 0) { +               && (flow_event_wait(client.flows, client.fq, &timeout) +                   != -ETIMEDOUT)) { +                while ((fd = fqueue_next(client.fq)) >= 0) {                          msg_len = flow_read(fd, buf, OPING_BUF_SIZE);                          if (msg_len < 0)                                  continue; @@ -175,7 +173,13 @@ int client_main(void)          client.flows = flow_set_create();          if (client.flows == NULL) -                return 0; +                return -1; + +        client.fq = fqueue_create(); +        if (client.fq == NULL) { +                flow_set_destroy(client.flows); +                return -1; +        }          fd = flow_alloc(client.s_apn, NULL, NULL);          if (fd < 0) { @@ -251,6 +255,8 @@ int client_main(void)          pthread_mutex_lock(&client.lock);          free(client.times); +        flow_set_destroy(client.flows); +        fqueue_destroy(client.fq);          pthread_mutex_unlock(&client.lock);          pthread_mutex_destroy(&client.lock); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index bcd47f9a..720e71b6 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -75,23 +75,15 @@ void * server_thread(void *o)          struct timespec now = {0, 0};          struct timespec timeout = {0, 100 * MILLION};          int fd; -        fqueue_t * fq = fqueue_create(); -        if (fq == NULL) -                return (void *) 1;          (void) o;          while (true) { -                int ret = flow_event_wait(server.flows, fq, &timeout); -                if (ret == -ETIMEDOUT) +                if (flow_event_wait(server.flows, server.fq, &timeout) +                    == -ETIMEDOUT)                          continue; -                if (ret < 0) { -                        printf("Event error.\n"); -                        break; -                } - -                while ((fd = fqueue_next(fq)) >= 0) { +                while ((fd = fqueue_next(server.fq)) >= 0) {                          msg_len = flow_read(fd, buf, OPING_BUF_SIZE);                          if (msg_len < 0)                                  continue; @@ -176,6 +168,12 @@ int server_main(void)          if (server.flows == NULL)                  return 0; +        server.fq = fqueue_create(); +        if (server.fq == NULL) { +                flow_set_destroy(server.flows); +                return -1; +        } +          pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL);          pthread_create(&server.accept_pt, NULL, accept_thread, NULL);          pthread_create(&server.server_pt, NULL, server_thread, NULL); @@ -185,6 +183,9 @@ int server_main(void)          pthread_cancel(server.server_pt);          pthread_cancel(server.cleaner_pt); +        flow_set_destroy(server.flows); +        fqueue_destroy(server.fq); +          pthread_join(server.server_pt, NULL);          pthread_join(server.cleaner_pt, NULL); | 
