summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/unicast/dht.c6
-rw-r--r--src/ipcpd/unicast/dt.c61
-rw-r--r--src/ipcpd/unicast/dt.h2
-rw-r--r--src/ipcpd/unicast/fa.c76
-rw-r--r--src/ipcpd/unicast/fa.h6
5 files changed, 88 insertions, 63 deletions
diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c
index f9d43f7d..b964ca08 100644
--- a/src/ipcpd/unicast/dht.c
+++ b/src/ipcpd/unicast/dht.c
@@ -239,7 +239,7 @@ struct dht {
pthread_rwlock_t lock;
- int fd;
+ uint32_t eid;
struct tpm * tpm;
@@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht,
kad_msg__pack(msg, shm_du_buff_head(sdb));
- if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0)
+ if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)
break;
ipcp_sdb_release(sdb);
@@ -2814,7 +2814,7 @@ struct dht * dht_create(uint64_t addr)
if (tpm_start(dht->tpm))
goto fail_tpm_start;
- dht->fd = dt_reg_comp(dht, &dht_post_packet, DHT);
+ dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT);
notifier_reg(handle_event, dht);
#else
(void) handle_event;
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index cb069544..c8aadabb 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -509,30 +509,7 @@ static void packet_handler(int fd,
dt_pci_shrink(sdb);
if (dt_pci.eid >= PROG_RES_FDS) {
uint8_t ecn = *(head + dt_pci_info.ecn_o);
- fa_ecn_update(dt_pci.eid, ecn, len);
-
- if (ipcp_flow_write(dt_pci.eid, sdb)) {
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
-
- }
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].rcv_pkt[qc];
- dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
- ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
- dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
+ fa_np1_rcv(dt_pci.eid, ecn, sdb);
return;
}
@@ -540,14 +517,6 @@ static void packet_handler(int fd,
log_err("No registered component on eid %d.",
dt_pci.eid);
ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
return;
}
#ifdef IPCP_FLOW_STATS
@@ -788,7 +757,7 @@ int dt_reg_comp(void * comp,
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
- int np1_fd,
+ uint32_t eid,
struct shm_du_buff * sdb)
{
struct dt_pci dt_pci;
@@ -803,25 +772,27 @@ int dt_write_packet(uint64_t dst_addr,
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
#ifdef IPCP_FLOW_STATS
+ if (eid < PROG_RES_FDS) {
+ pthread_mutex_lock(&dt.stat[eid].lock);
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
+ ++dt.stat[eid].lcl_r_pkt[qc];
+ dt.stat[eid].lcl_r_bytes[qc] += len;
- ++dt.stat[np1_fd].lcl_r_pkt[qc];
- dt.stat[np1_fd].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_unlock(&dt.stat[eid].lock);
+ }
#endif
-
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
+ if (eid < PROG_RES_FDS) {
+ pthread_mutex_lock(&dt.stat[eid].lock);
- ++dt.stat[np1_fd].f_nhp_pkt[qc];
- dt.stat[np1_fd].f_nhp_bytes[qc] += len;
+ ++dt.stat[eid].lcl_r_pkt[qc];
+ dt.stat[eid].lcl_r_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_unlock(&dt.stat[eid].lock);
+ }
#endif
return -EPERM;
}
@@ -836,7 +807,7 @@ int dt_write_packet(uint64_t dst_addr,
dt_pci.dst_addr = dst_addr;
dt_pci.qc = qc;
- dt_pci.eid = np1_fd;
+ dt_pci.eid = eid;
dt_pci.ecn = ca_calc_ecn(fd, len);
dt_pci_ser(head, &dt_pci);
@@ -866,7 +837,7 @@ int dt_write_packet(uint64_t dst_addr,
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- if (np1_fd < PROG_RES_FDS) {
+ if (eid < PROG_RES_FDS) {
++dt.stat[fd].lcl_w_pkt[qc];
dt.stat[fd].lcl_w_bytes[qc] += len;
}
diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h
index 73b71a92..3e569dfe 100644
--- a/src/ipcpd/unicast/dt.h
+++ b/src/ipcpd/unicast/dt.h
@@ -49,7 +49,7 @@ int dt_reg_comp(void * comp,
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
- int res_fd,
+ uint32_t eid,
struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCPD_UNICAST_DT_H */
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 2fc37d95..08c7a930 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -84,7 +84,15 @@ struct cmd {
struct fa_flow {
#ifdef IPCP_FLOW_STATS
- time_t stamp;
+ time_t stamp; /* Flow creation */
+ size_t p_snd; /* Packets sent */
+ size_t p_snd_f; /* Packets sent fail */
+ size_t b_snd; /* Bytes sent */
+ size_t b_snd_f; /* Bytes sent fail */
+ size_t p_rcv; /* Packets received */
+ size_t p_rcv_f; /* Packets received fail */
+ size_t b_rcv; /* Bytes received */
+ size_t b_rcv_f; /* Bytes received fail */
#endif
int r_eid; /* remote endpoint id */
uint64_t r_addr; /* remote address */
@@ -97,7 +105,7 @@ struct {
#ifdef IPCP_FLOW_STATS
size_t n_flows;
#endif
- int fd;
+ uint32_t eid;
struct list_head cmds;
pthread_cond_t cond;
@@ -150,8 +158,20 @@ static int fa_stat_read(const char * path,
"Remote address: %20s\n"
"Local endpoint ID: %20d\n"
"Remote endpoint ID: %20d\n"
+ "Sent (packets): %20zu\n"
+ "Sent (bytes): %20zu\n"
+ "Send failed (packets): %20zu\n"
+ "Send failed (bytes): %20zu\n"
+ "Received (packets): %20zu\n"
+ "Received (bytes): %20zu\n"
+ "Receive failed (packets): %20zu\n"
+ "Receive failed (bytes): %20zu\n"
"%s",
tmstr, r_addrstr, fd, flow->r_eid,
+ flow->p_snd, flow->b_snd,
+ flow->p_snd_f, flow->b_snd_f,
+ flow->p_rcv, flow->b_rcv,
+ flow->b_rcv_f, flow->b_rcv_f,
castr);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -273,6 +293,10 @@ static void packet_handler(int fd,
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#ifdef IPCP_FLOW_STATS
+ ++flow->p_snd;
+ flow->b_snd += len;
+#endif
wnd = ca_ctx_update_snd(flow->ctx, len);
r_addr = flow->r_addr;
@@ -285,6 +309,12 @@ static void packet_handler(int fd,
if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
ipcp_sdb_release(sdb);
log_warn("Failed to forward packet.");
+#ifdef IPCP_FLOW_STATS
+ pthread_rwlock_wrlock(&fa.flows_lock);
+ ++flow->p_snd_f;
+ flow->b_snd_f += len;
+ pthread_rwlock_unlock(&fa.flows_lock);
+#endif
return;
}
}
@@ -530,7 +560,7 @@ int fa_init(void)
list_head_init(&fa.cmds);
- fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);
+ fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA);
sprintf(fastr, "%s", FA);
if (rib_reg(fastr, &r_ops))
@@ -653,7 +683,7 @@ int fa_alloc(int fd,
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(shm_du_buff_head(sdb) + len, data, dlen);
- if (dt_write_packet(addr, qc, fa.fd, sdb)) {
+ if (dt_write_packet(addr, qc, fa.eid, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
@@ -729,7 +759,7 @@ int fa_alloc_resp(int fd,
psched_add(fa.psched, fd);
}
- if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+ if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {
fa_flow_fini(flow);
pthread_rwlock_unlock(&fa.flows_lock);
ipcp_sdb_release(sdb);
@@ -789,7 +819,7 @@ static int fa_update_remote(int fd,
pthread_rwlock_unlock(&fa.flows_lock);
- if (dt_write_packet(r_addr, qc, fa.fd, sdb)) {
+ if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
@@ -797,27 +827,51 @@ static int fa_update_remote(int fd,
return 0;
}
-void fa_ecn_update(int eid,
- uint8_t ecn,
- size_t len)
+void fa_np1_rcv(uint32_t eid,
+ uint8_t ecn,
+ struct shm_du_buff * sdb)
{
struct fa_flow * flow;
bool update;
uint16_t ece;
+ int fd;
+ size_t len;
+
+ fd = (int) eid;
+ if (fd < 0) {
+ ipcp_sdb_release(sdb);
+ return;
+ }
- flow = &fa.flows[eid];
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
+ flow = &fa.flows[fd];
pthread_rwlock_wrlock(&fa.flows_lock);
if (flow->r_eid == -1) {
pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_sdb_release(sdb);
return;
}
-
+#ifdef IPCP_FLOW_STATS
+ ++flow->p_rcv;
+ flow->b_rcv += len;
+#endif
update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece);
pthread_rwlock_unlock(&fa.flows_lock);
+ if (ipcp_flow_write(fd, sdb) < 0) {
+ ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_rwlock_wrlock(&fa.flows_lock);
+ ++flow->p_rcv_f;
+ flow->b_rcv_f += len;
+ pthread_rwlock_unlock(&fa.flows_lock);
+#endif
+ }
+
if (update)
fa_update_remote(eid, ece);
}
diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h
index daba2a51..c5c1baec 100644
--- a/src/ipcpd/unicast/fa.h
+++ b/src/ipcpd/unicast/fa.h
@@ -47,8 +47,8 @@ int fa_alloc_resp(int fd,
int fa_dealloc(int fd);
-void fa_ecn_update(int eid,
- uint8_t ecn,
- size_t len);
+void fa_np1_rcv(uint32_t eid,
+ uint8_t ecn,
+ struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCPD_UNICAST_FA_H */