From f52e231e3e71fa97276260d87f192701e2232614 Mon Sep 17 00:00:00 2001
From: Dimitri Staessens <dimitri@ouroboros.rocks>
Date: Sun, 6 Dec 2020 13:41:14 +0100
Subject: ipcpd: Remove DT-FA bypass on receiver side

The DT will now post all packets for N+1 flows through the flow
allocator component. This means that N+1 flows can be monitored
through the flow allocator stats, and N-1 flows through the DT stats.
The DT component still keeps stats for the local components (FA and
DHT), but this can be removed once the DHT has its own RIB
output.

The flow allocator show statistics for

Sent packets: total packets that were presented for sending
              on this specific flow
Send failed:  packets that were unable to be sent

Received packets: total packets that were presented by the DT component
                  on this specific flow
Received failed: packets that were unable to be delivered

These stats are presented as both packet counts and byte counts. To
know how many were successful, the values for failed need to be
subtracted from the values for total.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
---
 src/ipcpd/unicast/dht.c |  6 ++--
 src/ipcpd/unicast/dt.c  | 61 +++++++++++----------------------------
 src/ipcpd/unicast/dt.h  |  2 +-
 src/ipcpd/unicast/fa.c  | 76 ++++++++++++++++++++++++++++++++++++++++++-------
 src/ipcpd/unicast/fa.h  |  6 ++--
 5 files changed, 88 insertions(+), 63 deletions(-)

(limited to 'src')

diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c
index f9d43f7d..b964ca08 100644
--- a/src/ipcpd/unicast/dht.c
+++ b/src/ipcpd/unicast/dht.c
@@ -239,7 +239,7 @@ struct dht {
 
         pthread_rwlock_t lock;
 
-        int              fd;
+        uint32_t         eid;
 
         struct tpm *     tpm;
 
@@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht,
 
                 kad_msg__pack(msg, shm_du_buff_head(sdb));
 
-                if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0)
+                if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)
                         break;
 
                 ipcp_sdb_release(sdb);
@@ -2814,7 +2814,7 @@ struct dht * dht_create(uint64_t addr)
         if (tpm_start(dht->tpm))
                 goto fail_tpm_start;
 
-        dht->fd   = dt_reg_comp(dht, &dht_post_packet, DHT);
+        dht->eid   = dt_reg_comp(dht, &dht_post_packet, DHT);
         notifier_reg(handle_event, dht);
 #else
         (void) handle_event;
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index cb069544..c8aadabb 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -509,30 +509,7 @@ static void packet_handler(int                  fd,
                 dt_pci_shrink(sdb);
                 if (dt_pci.eid >= PROG_RES_FDS) {
                         uint8_t ecn = *(head + dt_pci_info.ecn_o);
-                        fa_ecn_update(dt_pci.eid, ecn, len);
-
-                        if (ipcp_flow_write(dt_pci.eid, sdb)) {
-                                ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
-                                pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
-                                ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
-                                dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
-                                pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
-
-                        }
-#ifdef IPCP_FLOW_STATS
-                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
-                        ++dt.stat[dt_pci.eid].rcv_pkt[qc];
-                        dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
-                        ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
-                        dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
-
-                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
+                        fa_np1_rcv(dt_pci.eid, ecn, sdb);
                         return;
                 }
 
@@ -540,14 +517,6 @@ static void packet_handler(int                  fd,
                         log_err("No registered component on eid %d.",
                                 dt_pci.eid);
                         ipcp_sdb_release(sdb);
-#ifdef IPCP_FLOW_STATS
-                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
-
-                        ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
-                        dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
-
-                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
-#endif
                         return;
                 }
 #ifdef IPCP_FLOW_STATS
@@ -788,7 +757,7 @@ int dt_reg_comp(void * comp,
 
 int dt_write_packet(uint64_t             dst_addr,
                     qoscube_t            qc,
-                    int                  np1_fd,
+                    uint32_t             eid,
                     struct shm_du_buff * sdb)
 {
         struct dt_pci dt_pci;
@@ -803,25 +772,27 @@ int dt_write_packet(uint64_t             dst_addr,
         len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
 
 #ifdef IPCP_FLOW_STATS
+        if (eid < PROG_RES_FDS) {
+                pthread_mutex_lock(&dt.stat[eid].lock);
 
-        pthread_mutex_lock(&dt.stat[np1_fd].lock);
+                ++dt.stat[eid].lcl_r_pkt[qc];
+                dt.stat[eid].lcl_r_bytes[qc] += len;
 
-        ++dt.stat[np1_fd].lcl_r_pkt[qc];
-        dt.stat[np1_fd].lcl_r_bytes[qc] += len;
-
-        pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+                pthread_mutex_unlock(&dt.stat[eid].lock);
+        }
 #endif
-
         fd = pff_nhop(dt.pff[qc], dst_addr);
         if (fd < 0) {
                 log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
 #ifdef IPCP_FLOW_STATS
-                pthread_mutex_lock(&dt.stat[np1_fd].lock);
+                if (eid < PROG_RES_FDS) {
+                        pthread_mutex_lock(&dt.stat[eid].lock);
 
-                ++dt.stat[np1_fd].f_nhp_pkt[qc];
-                dt.stat[np1_fd].f_nhp_bytes[qc] += len;
+                        ++dt.stat[eid].lcl_r_pkt[qc];
+                        dt.stat[eid].lcl_r_bytes[qc] += len;
 
-                pthread_mutex_unlock(&dt.stat[np1_fd].lock);
+                        pthread_mutex_unlock(&dt.stat[eid].lock);
+                }
 #endif
                 return -EPERM;
         }
@@ -836,7 +807,7 @@ int dt_write_packet(uint64_t             dst_addr,
 
         dt_pci.dst_addr = dst_addr;
         dt_pci.qc       = qc;
-        dt_pci.eid      = np1_fd;
+        dt_pci.eid      = eid;
         dt_pci.ecn      = ca_calc_ecn(fd, len);
 
         dt_pci_ser(head, &dt_pci);
@@ -866,7 +837,7 @@ int dt_write_packet(uint64_t             dst_addr,
 #ifdef IPCP_FLOW_STATS
         pthread_mutex_lock(&dt.stat[fd].lock);
 
-        if (np1_fd < PROG_RES_FDS) {
+        if (eid < PROG_RES_FDS) {
                 ++dt.stat[fd].lcl_w_pkt[qc];
                 dt.stat[fd].lcl_w_bytes[qc] += len;
         }
diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h
index 73b71a92..3e569dfe 100644
--- a/src/ipcpd/unicast/dt.h
+++ b/src/ipcpd/unicast/dt.h
@@ -49,7 +49,7 @@ int  dt_reg_comp(void * comp,
 
 int  dt_write_packet(uint64_t             dst_addr,
                      qoscube_t            qc,
-                     int                  res_fd,
+                     uint32_t             eid,
                      struct shm_du_buff * sdb);
 
 #endif /* OUROBOROS_IPCPD_UNICAST_DT_H */
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 2fc37d95..08c7a930 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -84,7 +84,15 @@ struct cmd {
 
 struct fa_flow {
 #ifdef IPCP_FLOW_STATS
-        time_t   stamp;
+        time_t   stamp;    /* Flow creation                  */
+        size_t   p_snd;    /* Packets sent                   */
+        size_t   p_snd_f;  /* Packets sent fail              */
+        size_t   b_snd;    /* Bytes sent                     */
+        size_t   b_snd_f;  /* Bytes sent fail                */
+        size_t   p_rcv;    /* Packets received               */
+        size_t   p_rcv_f;  /* Packets received fail          */
+        size_t   b_rcv;    /* Bytes received                 */
+        size_t   b_rcv_f;  /* Bytes received fail            */
 #endif
         int      r_eid;  /* remote endpoint id               */
         uint64_t r_addr; /* remote address                   */
@@ -97,7 +105,7 @@ struct {
 #ifdef IPCP_FLOW_STATS
         size_t           n_flows;
 #endif
-        int              fd;
+        uint32_t         eid;
 
         struct list_head cmds;
         pthread_cond_t   cond;
@@ -150,8 +158,20 @@ static int fa_stat_read(const char * path,
                 "Remote address:                  %20s\n"
                 "Local endpoint ID:               %20d\n"
                 "Remote endpoint ID:              %20d\n"
+                "Sent (packets):                  %20zu\n"
+                "Sent (bytes):                    %20zu\n"
+                "Send failed (packets):           %20zu\n"
+                "Send failed (bytes):             %20zu\n"
+                "Received (packets):              %20zu\n"
+                "Received (bytes):                %20zu\n"
+                "Receive failed (packets):        %20zu\n"
+                "Receive failed (bytes):          %20zu\n"
                 "%s",
                 tmstr, r_addrstr, fd, flow->r_eid,
+                flow->p_snd, flow->b_snd,
+                flow->p_snd_f, flow->b_snd_f,
+                flow->p_rcv, flow->b_rcv,
+                flow->b_rcv_f, flow->b_rcv_f,
                 castr);
 
         pthread_rwlock_unlock(&fa.flows_lock);
@@ -273,6 +293,10 @@ static void packet_handler(int                  fd,
 
         len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
 
+#ifdef IPCP_FLOW_STATS
+        ++flow->p_snd;
+        flow->b_snd += len;
+#endif
         wnd = ca_ctx_update_snd(flow->ctx, len);
 
         r_addr = flow->r_addr;
@@ -285,6 +309,12 @@ static void packet_handler(int                  fd,
         if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
                 ipcp_sdb_release(sdb);
                 log_warn("Failed to forward packet.");
+#ifdef IPCP_FLOW_STATS
+                pthread_rwlock_wrlock(&fa.flows_lock);
+                ++flow->p_snd_f;
+                flow->b_snd_f += len;
+                pthread_rwlock_unlock(&fa.flows_lock);
+#endif
                 return;
         }
 }
@@ -530,7 +560,7 @@ int fa_init(void)
 
         list_head_init(&fa.cmds);
 
-        fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);
+        fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA);
 
         sprintf(fastr, "%s", FA);
         if (rib_reg(fastr, &r_ops))
@@ -653,7 +683,7 @@ int fa_alloc(int             fd,
         memcpy(msg + 1, dst, ipcp_dir_hash_len());
         memcpy(shm_du_buff_head(sdb) + len, data, dlen);
 
-        if (dt_write_packet(addr, qc, fa.fd, sdb)) {
+        if (dt_write_packet(addr, qc, fa.eid, sdb)) {
                 ipcp_sdb_release(sdb);
                 return -1;
         }
@@ -729,7 +759,7 @@ int fa_alloc_resp(int          fd,
                 psched_add(fa.psched, fd);
         }
 
-        if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+        if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {
                 fa_flow_fini(flow);
                 pthread_rwlock_unlock(&fa.flows_lock);
                 ipcp_sdb_release(sdb);
@@ -789,7 +819,7 @@ static int fa_update_remote(int      fd,
         pthread_rwlock_unlock(&fa.flows_lock);
 
 
-        if (dt_write_packet(r_addr, qc, fa.fd, sdb)) {
+        if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {
                 ipcp_sdb_release(sdb);
                 return -1;
         }
@@ -797,27 +827,51 @@ static int fa_update_remote(int      fd,
         return 0;
 }
 
-void  fa_ecn_update(int     eid,
-                    uint8_t ecn,
-                    size_t  len)
+void  fa_np1_rcv(uint32_t             eid,
+                 uint8_t              ecn,
+                 struct shm_du_buff * sdb)
 {
         struct fa_flow * flow;
         bool             update;
         uint16_t         ece;
+        int              fd;
+        size_t           len;
+
+        fd = (int) eid;
+        if (fd < 0) {
+                ipcp_sdb_release(sdb);
+                return;
+        }
 
-        flow = &fa.flows[eid];
+        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
+        flow = &fa.flows[fd];
 
         pthread_rwlock_wrlock(&fa.flows_lock);
 
         if (flow->r_eid == -1) {
                 pthread_rwlock_unlock(&fa.flows_lock);
+                ipcp_sdb_release(sdb);
                 return;
         }
-
+#ifdef IPCP_FLOW_STATS
+        ++flow->p_rcv;
+        flow->b_rcv += len;
+#endif
         update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece);
 
         pthread_rwlock_unlock(&fa.flows_lock);
 
+        if (ipcp_flow_write(fd, sdb) < 0) {
+                ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+                pthread_rwlock_wrlock(&fa.flows_lock);
+                ++flow->p_rcv_f;
+                flow->b_rcv_f += len;
+                pthread_rwlock_unlock(&fa.flows_lock);
+#endif
+        }
+
         if (update)
                 fa_update_remote(eid, ece);
 }
diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h
index daba2a51..c5c1baec 100644
--- a/src/ipcpd/unicast/fa.h
+++ b/src/ipcpd/unicast/fa.h
@@ -47,8 +47,8 @@ int  fa_alloc_resp(int          fd,
 
 int  fa_dealloc(int fd);
 
-void fa_ecn_update(int     eid,
-                   uint8_t ecn,
-                   size_t  len);
+void fa_np1_rcv(uint32_t             eid,
+                uint8_t              ecn,
+                struct shm_du_buff * sdb);
 
 #endif /* OUROBOROS_IPCPD_UNICAST_FA_H */
-- 
cgit v1.2.3