diff options
-rw-r--r-- | src/ipcpd/local/main.c | 25 | ||||
-rw-r--r-- | src/ipcpd/shim-eth-llc/main.c | 33 | ||||
-rw-r--r-- | src/ipcpd/shim-udp/main.c | 33 | ||||
-rw-r--r-- | src/irmd/main.c | 11 | ||||
-rw-r--r-- | src/tools/oping/oping.c | 2 | ||||
-rw-r--r-- | src/tools/oping/oping_client.c | 18 | ||||
-rw-r--r-- | src/tools/oping/oping_server.c | 23 |
7 files changed, 84 insertions, 61 deletions
diff --git a/src/ipcpd/local/main.c b/src/ipcpd/local/main.c index b8783d9d..a8d5c273 100644 --- a/src/ipcpd/local/main.c +++ b/src/ipcpd/local/main.c @@ -50,6 +50,7 @@ int irmd_api; struct { int in_out[IRMD_MAX_FLOWS]; flow_set_t * flows; + fqueue_t * fq; pthread_rwlock_t lock; pthread_t sduloop; @@ -65,6 +66,12 @@ static int local_data_init(void) if (local_data.flows == NULL) return -ENFILE; + local_data.fq = fqueue_create(); + if (local_data.fq == NULL) { + flow_set_destroy(local_data.flows); + return -ENOMEM; + } + pthread_rwlock_init(&local_data.lock, NULL); return 0; @@ -72,40 +79,35 @@ static int local_data_init(void) static void local_data_fini(void) { + flow_set_destroy(local_data.flows); + fqueue_destroy(local_data.fq); pthread_rwlock_destroy(&local_data.lock); } static void * ipcp_local_sdu_loop(void * o) { struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; (void) o; while (true) { int fd; - int ret; ssize_t idx; - ret = flow_event_wait(local_data.flows, fq, &timeout); - if (ret == -ETIMEDOUT) + if (flow_event_wait(local_data.flows, local_data.fq, &timeout) + == -ETIMEDOUT) continue; - assert(!ret); - pthread_rwlock_rdlock(&ipcpi.state_lock); if (ipcp_get_state() != IPCP_ENROLLED) { pthread_rwlock_unlock(&ipcpi.state_lock); - fqueue_destroy(fq); return (void *) 1; /* -ENOTENROLLED */ } pthread_rwlock_rdlock(&local_data.lock); - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(local_data.fq)) >= 0) { idx = local_flow_read(fd); assert((size_t) idx < (SHM_BUFFER_SIZE)); @@ -152,7 +154,8 @@ static int ipcp_local_bootstrap(struct dif_config * conf) assert(conf); assert(conf->type == THIS_TYPE); - (void) conf; + /* this IPCP doesn't need to maintain its dif_name */ + free(conf->dif_name); pthread_rwlock_wrlock(&ipcpi.state_lock); diff --git a/src/ipcpd/shim-eth-llc/main.c b/src/ipcpd/shim-eth-llc/main.c index d4ea8eba..f6cded2b 100644 --- a/src/ipcpd/shim-eth-llc/main.c +++ b/src/ipcpd/shim-eth-llc/main.c @@ -113,6 +113,7 @@ struct { int tx_offset; #endif flow_set_t * np1_flows; + fqueue_t * fq; int * ef_to_fd; struct ef * fd_to_ef; pthread_rwlock_t flows_lock; @@ -150,6 +151,15 @@ static int eth_llc_data_init(void) return -ENOMEM; } + eth_llc_data.fq = fqueue_create(); + if (eth_llc_data.fq == NULL) { + flow_set_destroy(eth_llc_data.np1_flows); + bmp_destroy(eth_llc_data.saps); + free(eth_llc_data.ef_to_fd); + free(eth_llc_data.fd_to_ef); + return -ENOMEM; + } + for (i = 0; i < MAX_SAPS; ++i) eth_llc_data.ef_to_fd[i] = -1; @@ -168,6 +178,7 @@ void eth_llc_data_fini(void) { bmp_destroy(eth_llc_data.saps); flow_set_destroy(eth_llc_data.np1_flows); + fqueue_destroy(eth_llc_data.fq); free(eth_llc_data.fd_to_ef); free(eth_llc_data.ef_to_fd); pthread_rwlock_destroy(ð_llc_data.flows_lock); @@ -648,20 +659,16 @@ static void * eth_llc_ipcp_sdu_writer(void * o) uint8_t dsap; uint8_t r_addr[MAC_SIZE]; struct timespec timeout = {0, EVENT_WAIT_TIMEOUT * 1000}; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; (void) o; while (true) { - int ret = flow_event_wait(eth_llc_data.np1_flows, fq, &timeout); - if (ret == -ETIMEDOUT) + if (flow_event_wait(eth_llc_data.np1_flows, + eth_llc_data.fq, + &timeout) == -ETIMEDOUT) continue; - assert(!ret); - - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(eth_llc_data.fq)) >= 0) { if (ipcp_flow_read(fd, &sdb)) { LOG_ERR("Bad read from fd %d.", fd); continue; @@ -729,13 +736,11 @@ static int eth_llc_ipcp_bootstrap(struct dif_config * conf) struct tpacket_req req; #endif - if (conf == NULL) - return -1; /* -EINVAL */ + assert(conf); + assert(conf->type == THIS_TYPE); - if (conf->type != THIS_TYPE) { - LOG_ERR("Config doesn't match IPCP type."); - return -1; - } + /* this IPCP doesn't need to maintain its dif_name */ + free(conf->dif_name); if (conf->if_name == NULL) { LOG_ERR("Interface name is NULL."); diff --git a/src/ipcpd/shim-udp/main.c b/src/ipcpd/shim-udp/main.c index f779713c..e06787ce 100644 --- a/src/ipcpd/shim-udp/main.c +++ b/src/ipcpd/shim-udp/main.c @@ -78,6 +78,7 @@ struct { int s_fd; flow_set_t * np1_flows; + fqueue_t * fq; fd_set flow_fd_s; /* bidir mappings of (n - 1) file descriptor to (n) flow descriptor */ int uf_to_fd[FD_SETSIZE]; @@ -111,6 +112,12 @@ static int udp_data_init(void) if (udp_data.np1_flows == NULL) return -ENOMEM; + udp_data.fq = fqueue_create(); + if (udp_data.fq == NULL) { + flow_set_destroy(udp_data.np1_flows); + return -ENOMEM; + } + pthread_rwlock_init(&udp_data.flows_lock, NULL); pthread_cond_init(&udp_data.fd_set_cond, NULL); pthread_mutex_init(&udp_data.fd_set_lock, NULL); @@ -121,6 +128,7 @@ static int udp_data_init(void) static void udp_data_fini(void) { flow_set_destroy(udp_data.np1_flows); + fqueue_destroy(udp_data.fq); pthread_rwlock_destroy(&udp_data.flows_lock); pthread_mutex_destroy(&udp_data.fd_set_lock); @@ -522,23 +530,16 @@ static void * ipcp_udp_sdu_loop(void * o) int fd; struct timespec timeout = {0, FD_UPDATE_TIMEOUT * 1000}; struct shm_du_buff * sdb; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; (void) o; while (true) { - int ret = flow_event_wait(udp_data.np1_flows, fq, &timeout); - if (ret == -ETIMEDOUT) - continue; - - if (ret < 0) { - LOG_ERR("Event wait returned error code %d.", -ret); + if (flow_event_wait(udp_data.np1_flows, + udp_data.fq, + &timeout) == -ETIMEDOUT) continue; - } - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(udp_data.fq)) >= 0) { if (ipcp_flow_read(fd, &sdb)) { LOG_ERR("Bad read from fd %d.", fd); continue; @@ -593,13 +594,11 @@ static int ipcp_udp_bootstrap(struct dif_config * conf) int enable = 1; int fd = -1; - if (conf == NULL) - return -1; /* -EINVAL */ + assert(conf); + assert(conf->type == THIS_TYPE); - if (conf->type != THIS_TYPE) { - LOG_ERR("Config doesn't match IPCP type."); - return -1; - } + /* this IPCP doesn't need to maintain its dif_name */ + free(conf->dif_name); if (inet_ntop(AF_INET, &conf->ip_addr, diff --git a/src/irmd/main.c b/src/irmd/main.c index aac47adb..740472b9 100644 --- a/src/irmd/main.c +++ b/src/irmd/main.c @@ -1241,8 +1241,12 @@ static int flow_alloc_res(int port_id) pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); - if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) + if (irm_flow_wait_state(f, FLOW_ALLOCATED) == FLOW_ALLOCATED) { + LOG_INFO("Flow on port_id %d allocated.", port_id); return 0; + } + + LOG_INFO("Pending flow on port_id %d torn down.", port_id); return -1; } @@ -1344,6 +1348,7 @@ static struct irm_flow * flow_req_arr(pid_t api, struct pid_el * c_api; pid_t h_api = -1; + int port_id = -1; LOG_DBGF("Flow req arrived from IPCP %d for %s on AE %s.", api, dst_name, ae_name); @@ -1462,7 +1467,7 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_rwlock_unlock(&irmd->reg_lock); pthread_rwlock_wrlock(&irmd->flows_lock); - f->port_id = bmp_allocate(irmd->port_ids); + port_id = f->port_id = bmp_allocate(irmd->port_ids); if (!bmp_is_id_valid(irmd->port_ids, f->port_id)) { pthread_rwlock_unlock(&irmd->flows_lock); pthread_rwlock_unlock(&irmd->state_lock); @@ -1527,6 +1532,8 @@ static struct irm_flow * flow_req_arr(pid_t api, pthread_mutex_unlock(&re->state_lock); + LOG_INFO("Flow on port_id %d allocated.", port_id); + return f; } diff --git a/src/tools/oping/oping.c b/src/tools/oping/oping.c index b476b33a..801f79b5 100644 --- a/src/tools/oping/oping.c +++ b/src/tools/oping/oping.c @@ -54,6 +54,7 @@ struct c { double rtt_m2; flow_set_t * flows; + fqueue_t * fq; /* needs locking */ struct timespec * times; @@ -66,6 +67,7 @@ struct c { struct s { struct timespec times[OPING_MAX_FLOWS]; flow_set_t * flows; + fqueue_t * fq; pthread_mutex_t lock; pthread_t cleaner_pt; diff --git a/src/tools/oping/oping_client.c b/src/tools/oping/oping_client.c index 9f49a1df..85cb2880 100644 --- a/src/tools/oping/oping_client.c +++ b/src/tools/oping/oping_client.c @@ -64,16 +64,14 @@ void * reader(void * o) int msg_len = 0; double ms = 0; double d = 0; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; (void) o; /* FIXME: use flow timeout option once we have it */ while (client.rcvd != client.count - && flow_event_wait(client.flows, fq, &timeout) != -ETIMEDOUT) { - while ((fd = fqueue_next(fq)) >= 0) { + && (flow_event_wait(client.flows, client.fq, &timeout) + != -ETIMEDOUT)) { + while ((fd = fqueue_next(client.fq)) >= 0) { msg_len = flow_read(fd, buf, OPING_BUF_SIZE); if (msg_len < 0) continue; @@ -175,7 +173,13 @@ int client_main(void) client.flows = flow_set_create(); if (client.flows == NULL) - return 0; + return -1; + + client.fq = fqueue_create(); + if (client.fq == NULL) { + flow_set_destroy(client.flows); + return -1; + } fd = flow_alloc(client.s_apn, NULL, NULL); if (fd < 0) { @@ -251,6 +255,8 @@ int client_main(void) pthread_mutex_lock(&client.lock); free(client.times); + flow_set_destroy(client.flows); + fqueue_destroy(client.fq); pthread_mutex_unlock(&client.lock); pthread_mutex_destroy(&client.lock); diff --git a/src/tools/oping/oping_server.c b/src/tools/oping/oping_server.c index bcd47f9a..720e71b6 100644 --- a/src/tools/oping/oping_server.c +++ b/src/tools/oping/oping_server.c @@ -75,23 +75,15 @@ void * server_thread(void *o) struct timespec now = {0, 0}; struct timespec timeout = {0, 100 * MILLION}; int fd; - fqueue_t * fq = fqueue_create(); - if (fq == NULL) - return (void *) 1; (void) o; while (true) { - int ret = flow_event_wait(server.flows, fq, &timeout); - if (ret == -ETIMEDOUT) + if (flow_event_wait(server.flows, server.fq, &timeout) + == -ETIMEDOUT) continue; - if (ret < 0) { - printf("Event error.\n"); - break; - } - - while ((fd = fqueue_next(fq)) >= 0) { + while ((fd = fqueue_next(server.fq)) >= 0) { msg_len = flow_read(fd, buf, OPING_BUF_SIZE); if (msg_len < 0) continue; @@ -176,6 +168,12 @@ int server_main(void) if (server.flows == NULL) return 0; + server.fq = fqueue_create(); + if (server.fq == NULL) { + flow_set_destroy(server.flows); + return -1; + } + pthread_create(&server.cleaner_pt, NULL, cleaner_thread, NULL); pthread_create(&server.accept_pt, NULL, accept_thread, NULL); pthread_create(&server.server_pt, NULL, server_thread, NULL); @@ -185,6 +183,9 @@ int server_main(void) pthread_cancel(server.server_pt); pthread_cancel(server.cleaner_pt); + flow_set_destroy(server.flows); + fqueue_destroy(server.fq); + pthread_join(server.server_pt, NULL); pthread_join(server.cleaner_pt, NULL); |