diff options
-rw-r--r-- | src/ipcpd/unicast/dht.c | 6 | ||||
-rw-r--r-- | src/ipcpd/unicast/dt.c | 61 | ||||
-rw-r--r-- | src/ipcpd/unicast/dt.h | 2 | ||||
-rw-r--r-- | src/ipcpd/unicast/fa.c | 76 | ||||
-rw-r--r-- | src/ipcpd/unicast/fa.h | 6 |
5 files changed, 88 insertions, 63 deletions
diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c index f9d43f7d..b964ca08 100644 --- a/src/ipcpd/unicast/dht.c +++ b/src/ipcpd/unicast/dht.c @@ -239,7 +239,7 @@ struct dht { pthread_rwlock_t lock; - int fd; + uint32_t eid; struct tpm * tpm; @@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht, kad_msg__pack(msg, shm_du_buff_head(sdb)); - if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) + if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0) break; ipcp_sdb_release(sdb); @@ -2814,7 +2814,7 @@ struct dht * dht_create(uint64_t addr) if (tpm_start(dht->tpm)) goto fail_tpm_start; - dht->fd = dt_reg_comp(dht, &dht_post_packet, DHT); + dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT); notifier_reg(handle_event, dht); #else (void) handle_event; diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index cb069544..c8aadabb 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -509,30 +509,7 @@ static void packet_handler(int fd, dt_pci_shrink(sdb); if (dt_pci.eid >= PROG_RES_FDS) { uint8_t ecn = *(head + dt_pci_info.ecn_o); - fa_ecn_update(dt_pci.eid, ecn, len); - - if (ipcp_flow_write(dt_pci.eid, sdb)) { - ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; - dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif - - } -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].rcv_pkt[qc]; - dt.stat[dt_pci.eid].rcv_bytes[qc] += len; - ++dt.stat[dt_pci.eid].lcl_r_pkt[qc]; - dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif + fa_np1_rcv(dt_pci.eid, ecn, sdb); return; } @@ -540,14 +517,6 @@ static void packet_handler(int fd, log_err("No registered component on eid %d.", dt_pci.eid); ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - - ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; - dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif return; } #ifdef IPCP_FLOW_STATS @@ -788,7 +757,7 @@ int dt_reg_comp(void * comp, int dt_write_packet(uint64_t dst_addr, qoscube_t qc, - int np1_fd, + uint32_t eid, struct shm_du_buff * sdb) { struct dt_pci dt_pci; @@ -803,25 +772,27 @@ int dt_write_packet(uint64_t dst_addr, len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); #ifdef IPCP_FLOW_STATS + if (eid < PROG_RES_FDS) { + pthread_mutex_lock(&dt.stat[eid].lock); - pthread_mutex_lock(&dt.stat[np1_fd].lock); + ++dt.stat[eid].lcl_r_pkt[qc]; + dt.stat[eid].lcl_r_bytes[qc] += len; - ++dt.stat[np1_fd].lcl_r_pkt[qc]; - dt.stat[np1_fd].lcl_r_bytes[qc] += len; - - pthread_mutex_unlock(&dt.stat[np1_fd].lock); + pthread_mutex_unlock(&dt.stat[eid].lock); + } #endif - fd = pff_nhop(dt.pff[qc], dst_addr); if (fd < 0) { log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); #ifdef IPCP_FLOW_STATS - pthread_mutex_lock(&dt.stat[np1_fd].lock); + if (eid < PROG_RES_FDS) { + pthread_mutex_lock(&dt.stat[eid].lock); - ++dt.stat[np1_fd].f_nhp_pkt[qc]; - dt.stat[np1_fd].f_nhp_bytes[qc] += len; + ++dt.stat[eid].lcl_r_pkt[qc]; + dt.stat[eid].lcl_r_bytes[qc] += len; - pthread_mutex_unlock(&dt.stat[np1_fd].lock); + pthread_mutex_unlock(&dt.stat[eid].lock); + } #endif return -EPERM; } @@ -836,7 +807,7 @@ int dt_write_packet(uint64_t dst_addr, dt_pci.dst_addr = dst_addr; dt_pci.qc = qc; - dt_pci.eid = np1_fd; + dt_pci.eid = eid; dt_pci.ecn = ca_calc_ecn(fd, len); dt_pci_ser(head, &dt_pci); @@ -866,7 +837,7 @@ int dt_write_packet(uint64_t dst_addr, #ifdef IPCP_FLOW_STATS pthread_mutex_lock(&dt.stat[fd].lock); - if (np1_fd < PROG_RES_FDS) { + if (eid < PROG_RES_FDS) { ++dt.stat[fd].lcl_w_pkt[qc]; dt.stat[fd].lcl_w_bytes[qc] += len; } diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h index 73b71a92..3e569dfe 100644 --- a/src/ipcpd/unicast/dt.h +++ b/src/ipcpd/unicast/dt.h @@ -49,7 +49,7 @@ int dt_reg_comp(void * comp, int dt_write_packet(uint64_t dst_addr, qoscube_t qc, - int res_fd, + uint32_t eid, struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_UNICAST_DT_H */ diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 2fc37d95..08c7a930 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -84,7 +84,15 @@ struct cmd { struct fa_flow { #ifdef IPCP_FLOW_STATS - time_t stamp; + time_t stamp; /* Flow creation */ + size_t p_snd; /* Packets sent */ + size_t p_snd_f; /* Packets sent fail */ + size_t b_snd; /* Bytes sent */ + size_t b_snd_f; /* Bytes sent fail */ + size_t p_rcv; /* Packets received */ + size_t p_rcv_f; /* Packets received fail */ + size_t b_rcv; /* Bytes received */ + size_t b_rcv_f; /* Bytes received fail */ #endif int r_eid; /* remote endpoint id */ uint64_t r_addr; /* remote address */ @@ -97,7 +105,7 @@ struct { #ifdef IPCP_FLOW_STATS size_t n_flows; #endif - int fd; + uint32_t eid; struct list_head cmds; pthread_cond_t cond; @@ -150,8 +158,20 @@ static int fa_stat_read(const char * path, "Remote address: %20s\n" "Local endpoint ID: %20d\n" "Remote endpoint ID: %20d\n" + "Sent (packets): %20zu\n" + "Sent (bytes): %20zu\n" + "Send failed (packets): %20zu\n" + "Send failed (bytes): %20zu\n" + "Received (packets): %20zu\n" + "Received (bytes): %20zu\n" + "Receive failed (packets): %20zu\n" + "Receive failed (bytes): %20zu\n" "%s", tmstr, r_addrstr, fd, flow->r_eid, + flow->p_snd, flow->b_snd, + flow->p_snd_f, flow->b_snd_f, + flow->p_rcv, flow->b_rcv, + flow->b_rcv_f, flow->b_rcv_f, castr); pthread_rwlock_unlock(&fa.flows_lock); @@ -273,6 +293,10 @@ static void packet_handler(int fd, len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#ifdef IPCP_FLOW_STATS + ++flow->p_snd; + flow->b_snd += len; +#endif wnd = ca_ctx_update_snd(flow->ctx, len); r_addr = flow->r_addr; @@ -285,6 +309,12 @@ static void packet_handler(int fd, if (dt_write_packet(r_addr, qc, r_eid, sdb)) { ipcp_sdb_release(sdb); log_warn("Failed to forward packet."); +#ifdef IPCP_FLOW_STATS + pthread_rwlock_wrlock(&fa.flows_lock); + ++flow->p_snd_f; + flow->b_snd_f += len; + pthread_rwlock_unlock(&fa.flows_lock); +#endif return; } } @@ -530,7 +560,7 @@ int fa_init(void) list_head_init(&fa.cmds); - fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); + fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA); sprintf(fastr, "%s", FA); if (rib_reg(fastr, &r_ops)) @@ -653,7 +683,7 @@ int fa_alloc(int fd, memcpy(msg + 1, dst, ipcp_dir_hash_len()); memcpy(shm_du_buff_head(sdb) + len, data, dlen); - if (dt_write_packet(addr, qc, fa.fd, sdb)) { + if (dt_write_packet(addr, qc, fa.eid, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -729,7 +759,7 @@ int fa_alloc_resp(int fd, psched_add(fa.psched, fd); } - if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { + if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) { fa_flow_fini(flow); pthread_rwlock_unlock(&fa.flows_lock); ipcp_sdb_release(sdb); @@ -789,7 +819,7 @@ static int fa_update_remote(int fd, pthread_rwlock_unlock(&fa.flows_lock); - if (dt_write_packet(r_addr, qc, fa.fd, sdb)) { + if (dt_write_packet(r_addr, qc, fa.eid, sdb)) { ipcp_sdb_release(sdb); return -1; } @@ -797,27 +827,51 @@ static int fa_update_remote(int fd, return 0; } -void fa_ecn_update(int eid, - uint8_t ecn, - size_t len) +void fa_np1_rcv(uint32_t eid, + uint8_t ecn, + struct shm_du_buff * sdb) { struct fa_flow * flow; bool update; uint16_t ece; + int fd; + size_t len; + + fd = (int) eid; + if (fd < 0) { + ipcp_sdb_release(sdb); + return; + } - flow = &fa.flows[eid]; + len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + + flow = &fa.flows[fd]; pthread_rwlock_wrlock(&fa.flows_lock); if (flow->r_eid == -1) { pthread_rwlock_unlock(&fa.flows_lock); + ipcp_sdb_release(sdb); return; } - +#ifdef IPCP_FLOW_STATS + ++flow->p_rcv; + flow->b_rcv += len; +#endif update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece); pthread_rwlock_unlock(&fa.flows_lock); + if (ipcp_flow_write(fd, sdb) < 0) { + ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_rwlock_wrlock(&fa.flows_lock); + ++flow->p_rcv_f; + flow->b_rcv_f += len; + pthread_rwlock_unlock(&fa.flows_lock); +#endif + } + if (update) fa_update_remote(eid, ece); } diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h index daba2a51..c5c1baec 100644 --- a/src/ipcpd/unicast/fa.h +++ b/src/ipcpd/unicast/fa.h @@ -47,8 +47,8 @@ int fa_alloc_resp(int fd, int fa_dealloc(int fd); -void fa_ecn_update(int eid, - uint8_t ecn, - size_t len); +void fa_np1_rcv(uint32_t eid, + uint8_t ecn, + struct shm_du_buff * sdb); #endif /* OUROBOROS_IPCPD_UNICAST_FA_H */ |