summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2020-12-06 13:41:14 +0100
committerSander Vrijders <sander@ouroboros.rocks>2020-12-07 18:39:41 +0100
commitf52e231e3e71fa97276260d87f192701e2232614 (patch)
tree8903f9d1c2b3166833b6960062df0c4f182975c6
parent37ebb798813514bc29b60ea7d28b99ad1036c68d (diff)
downloadouroboros-f52e231e3e71fa97276260d87f192701e2232614.zip
ouroboros-f52e231e3e71fa97276260d87f192701e2232614.tar.gz
ipcpd: Remove DT-FA bypass on receiver side
The DT will now post all packets for N+1 flows through the flow allocator component. This means that N+1 flows can be monitored through the flow allocator stats, and N-1 flows through the DT stats. The DT component still keeps stats for the local components (FA and DHT), but this can be removed once the DHT has its own RIB output. The flow allocator show statistics for Sent packets: total packets that were presented for sending on this specific flow Send failed: packets that were unable to be sent Received packets: total packets that were presented by the DT component on this specific flow Received failed: packets that were unable to be delivered These stats are presented as both packet counts and byte counts. To know how many were successful, the values for failed need to be subtracted from the values for total. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--src/ipcpd/unicast/dht.c6
-rw-r--r--src/ipcpd/unicast/dt.c61
-rw-r--r--src/ipcpd/unicast/dt.h2
-rw-r--r--src/ipcpd/unicast/fa.c76
-rw-r--r--src/ipcpd/unicast/fa.h6
5 files changed, 88 insertions, 63 deletions
diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c
index f9d43f7..b964ca0 100644
--- a/src/ipcpd/unicast/dht.c
+++ b/src/ipcpd/unicast/dht.c
@@ -239,7 +239,7 @@ struct dht {
pthread_rwlock_t lock;
- int fd;
+ uint32_t eid;
struct tpm * tpm;
@@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht,
kad_msg__pack(msg, shm_du_buff_head(sdb));
- if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0)
+ if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)
break;
ipcp_sdb_release(sdb);
@@ -2814,7 +2814,7 @@ struct dht * dht_create(uint64_t addr)
if (tpm_start(dht->tpm))
goto fail_tpm_start;
- dht->fd = dt_reg_comp(dht, &dht_post_packet, DHT);
+ dht->eid = dt_reg_comp(dht, &dht_post_packet, DHT);
notifier_reg(handle_event, dht);
#else
(void) handle_event;
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index cb06954..c8aadab 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -509,30 +509,7 @@ static void packet_handler(int fd,
dt_pci_shrink(sdb);
if (dt_pci.eid >= PROG_RES_FDS) {
uint8_t ecn = *(head + dt_pci_info.ecn_o);
- fa_ecn_update(dt_pci.eid, ecn, len);
-
- if (ipcp_flow_write(dt_pci.eid, sdb)) {
- ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
-
- }
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].rcv_pkt[qc];
- dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
- ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
- dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
+ fa_np1_rcv(dt_pci.eid, ecn, sdb);
return;
}
@@ -540,14 +517,6 @@ static void packet_handler(int fd,
log_err("No registered component on eid %d.",
dt_pci.eid);
ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
- ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
- dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
return;
}
#ifdef IPCP_FLOW_STATS
@@ -788,7 +757,7 @@ int dt_reg_comp(void * comp,
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
- int np1_fd,
+ uint32_t eid,
struct shm_du_buff * sdb)
{
struct dt_pci dt_pci;
@@ -803,25 +772,27 @@ int dt_write_packet(uint64_t dst_addr,
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
#ifdef IPCP_FLOW_STATS
+ if (eid < PROG_RES_FDS) {
+ pthread_mutex_lock(&dt.stat[eid].lock);
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
+ ++dt.stat[eid].lcl_r_pkt[qc];
+ dt.stat[eid].lcl_r_bytes[qc] += len;
- ++dt.stat[np1_fd].lcl_r_pkt[qc];
- dt.stat[np1_fd].lcl_r_bytes[qc] += len;
-
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_unlock(&dt.stat[eid].lock);
+ }
#endif
-
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
#ifdef IPCP_FLOW_STATS
- pthread_mutex_lock(&dt.stat[np1_fd].lock);
+ if (eid < PROG_RES_FDS) {
+ pthread_mutex_lock(&dt.stat[eid].lock);
- ++dt.stat[np1_fd].f_nhp_pkt[qc];
- dt.stat[np1_fd].f_nhp_bytes[qc] += len;
+ ++dt.stat[eid].lcl_r_pkt[qc];
+ dt.stat[eid].lcl_r_bytes[qc] += len;
- pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+ pthread_mutex_unlock(&dt.stat[eid].lock);
+ }
#endif
return -EPERM;
}
@@ -836,7 +807,7 @@ int dt_write_packet(uint64_t dst_addr,
dt_pci.dst_addr = dst_addr;
dt_pci.qc = qc;
- dt_pci.eid = np1_fd;
+ dt_pci.eid = eid;
dt_pci.ecn = ca_calc_ecn(fd, len);
dt_pci_ser(head, &dt_pci);
@@ -866,7 +837,7 @@ int dt_write_packet(uint64_t dst_addr,
#ifdef IPCP_FLOW_STATS
pthread_mutex_lock(&dt.stat[fd].lock);
- if (np1_fd < PROG_RES_FDS) {
+ if (eid < PROG_RES_FDS) {
++dt.stat[fd].lcl_w_pkt[qc];
dt.stat[fd].lcl_w_bytes[qc] += len;
}
diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h
index 73b71a9..3e569df 100644
--- a/src/ipcpd/unicast/dt.h
+++ b/src/ipcpd/unicast/dt.h
@@ -49,7 +49,7 @@ int dt_reg_comp(void * comp,
int dt_write_packet(uint64_t dst_addr,
qoscube_t qc,
- int res_fd,
+ uint32_t eid,
struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCPD_UNICAST_DT_H */
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 2fc37d9..08c7a93 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -84,7 +84,15 @@ struct cmd {
struct fa_flow {
#ifdef IPCP_FLOW_STATS
- time_t stamp;
+ time_t stamp; /* Flow creation */
+ size_t p_snd; /* Packets sent */
+ size_t p_snd_f; /* Packets sent fail */
+ size_t b_snd; /* Bytes sent */
+ size_t b_snd_f; /* Bytes sent fail */
+ size_t p_rcv; /* Packets received */
+ size_t p_rcv_f; /* Packets received fail */
+ size_t b_rcv; /* Bytes received */
+ size_t b_rcv_f; /* Bytes received fail */
#endif
int r_eid; /* remote endpoint id */
uint64_t r_addr; /* remote address */
@@ -97,7 +105,7 @@ struct {
#ifdef IPCP_FLOW_STATS
size_t n_flows;
#endif
- int fd;
+ uint32_t eid;
struct list_head cmds;
pthread_cond_t cond;
@@ -150,8 +158,20 @@ static int fa_stat_read(const char * path,
"Remote address: %20s\n"
"Local endpoint ID: %20d\n"
"Remote endpoint ID: %20d\n"
+ "Sent (packets): %20zu\n"
+ "Sent (bytes): %20zu\n"
+ "Send failed (packets): %20zu\n"
+ "Send failed (bytes): %20zu\n"
+ "Received (packets): %20zu\n"
+ "Received (bytes): %20zu\n"
+ "Receive failed (packets): %20zu\n"
+ "Receive failed (bytes): %20zu\n"
"%s",
tmstr, r_addrstr, fd, flow->r_eid,
+ flow->p_snd, flow->b_snd,
+ flow->p_snd_f, flow->b_snd_f,
+ flow->p_rcv, flow->b_rcv,
+ flow->b_rcv_f, flow->b_rcv_f,
castr);
pthread_rwlock_unlock(&fa.flows_lock);
@@ -273,6 +293,10 @@ static void packet_handler(int fd,
len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#ifdef IPCP_FLOW_STATS
+ ++flow->p_snd;
+ flow->b_snd += len;
+#endif
wnd = ca_ctx_update_snd(flow->ctx, len);
r_addr = flow->r_addr;
@@ -285,6 +309,12 @@ static void packet_handler(int fd,
if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
ipcp_sdb_release(sdb);
log_warn("Failed to forward packet.");
+#ifdef IPCP_FLOW_STATS
+ pthread_rwlock_wrlock(&fa.flows_lock);
+ ++flow->p_snd_f;
+ flow->b_snd_f += len;
+ pthread_rwlock_unlock(&fa.flows_lock);
+#endif
return;
}
}
@@ -530,7 +560,7 @@ int fa_init(void)
list_head_init(&fa.cmds);
- fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);
+ fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA);
sprintf(fastr, "%s", FA);
if (rib_reg(fastr, &r_ops))
@@ -653,7 +683,7 @@ int fa_alloc(int fd,
memcpy(msg + 1, dst, ipcp_dir_hash_len());
memcpy(shm_du_buff_head(sdb) + len, data, dlen);
- if (dt_write_packet(addr, qc, fa.fd, sdb)) {
+ if (dt_write_packet(addr, qc, fa.eid, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
@@ -729,7 +759,7 @@ int fa_alloc_resp(int fd,
psched_add(fa.psched, fd);
}
- if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+ if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {
fa_flow_fini(flow);
pthread_rwlock_unlock(&fa.flows_lock);
ipcp_sdb_release(sdb);
@@ -789,7 +819,7 @@ static int fa_update_remote(int fd,
pthread_rwlock_unlock(&fa.flows_lock);
- if (dt_write_packet(r_addr, qc, fa.fd, sdb)) {
+ if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {
ipcp_sdb_release(sdb);
return -1;
}
@@ -797,27 +827,51 @@ static int fa_update_remote(int fd,
return 0;
}
-void fa_ecn_update(int eid,
- uint8_t ecn,
- size_t len)
+void fa_np1_rcv(uint32_t eid,
+ uint8_t ecn,
+ struct shm_du_buff * sdb)
{
struct fa_flow * flow;
bool update;
uint16_t ece;
+ int fd;
+ size_t len;
+
+ fd = (int) eid;
+ if (fd < 0) {
+ ipcp_sdb_release(sdb);
+ return;
+ }
- flow = &fa.flows[eid];
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
+ flow = &fa.flows[fd];
pthread_rwlock_wrlock(&fa.flows_lock);
if (flow->r_eid == -1) {
pthread_rwlock_unlock(&fa.flows_lock);
+ ipcp_sdb_release(sdb);
return;
}
-
+#ifdef IPCP_FLOW_STATS
+ ++flow->p_rcv;
+ flow->b_rcv += len;
+#endif
update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece);
pthread_rwlock_unlock(&fa.flows_lock);
+ if (ipcp_flow_write(fd, sdb) < 0) {
+ ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_rwlock_wrlock(&fa.flows_lock);
+ ++flow->p_rcv_f;
+ flow->b_rcv_f += len;
+ pthread_rwlock_unlock(&fa.flows_lock);
+#endif
+ }
+
if (update)
fa_update_remote(eid, ece);
}
diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h
index daba2a5..c5c1bae 100644
--- a/src/ipcpd/unicast/fa.h
+++ b/src/ipcpd/unicast/fa.h
@@ -47,8 +47,8 @@ int fa_alloc_resp(int fd,
int fa_dealloc(int fd);
-void fa_ecn_update(int eid,
- uint8_t ecn,
- size_t len);
+void fa_np1_rcv(uint32_t eid,
+ uint8_t ecn,
+ struct shm_du_buff * sdb);
#endif /* OUROBOROS_IPCPD_UNICAST_FA_H */