diff options
Diffstat (limited to 'src/ipcpd')
| -rw-r--r-- | src/ipcpd/unicast/dht.c | 6 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dt.c | 61 | ||||
| -rw-r--r-- | src/ipcpd/unicast/dt.h | 2 | ||||
| -rw-r--r-- | src/ipcpd/unicast/fa.c | 76 | ||||
| -rw-r--r-- | src/ipcpd/unicast/fa.h | 6 | 
5 files changed, 88 insertions, 63 deletions
diff --git a/src/ipcpd/unicast/dht.c b/src/ipcpd/unicast/dht.c index f9d43f7d..b964ca08 100644 --- a/src/ipcpd/unicast/dht.c +++ b/src/ipcpd/unicast/dht.c @@ -239,7 +239,7 @@ struct dht {          pthread_rwlock_t lock; -        int              fd; +        uint32_t         eid;          struct tpm *     tpm; @@ -1489,7 +1489,7 @@ static int send_msg(struct dht * dht,                  kad_msg__pack(msg, shm_du_buff_head(sdb)); -                if (dt_write_packet(addr, QOS_CUBE_BE, dht->fd, sdb) == 0) +                if (dt_write_packet(addr, QOS_CUBE_BE, dht->eid, sdb) == 0)                          break;                  ipcp_sdb_release(sdb); @@ -2814,7 +2814,7 @@ struct dht * dht_create(uint64_t addr)          if (tpm_start(dht->tpm))                  goto fail_tpm_start; -        dht->fd   = dt_reg_comp(dht, &dht_post_packet, DHT); +        dht->eid   = dt_reg_comp(dht, &dht_post_packet, DHT);          notifier_reg(handle_event, dht);  #else          (void) handle_event; diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index cb069544..c8aadabb 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -509,30 +509,7 @@ static void packet_handler(int                  fd,                  dt_pci_shrink(sdb);                  if (dt_pci.eid >= PROG_RES_FDS) {                          uint8_t ecn = *(head + dt_pci_info.ecn_o); -                        fa_ecn_update(dt_pci.eid, ecn, len); - -                        if (ipcp_flow_write(dt_pci.eid, sdb)) { -                                ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS -                                pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - -                                ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; -                                dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; - -                                pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif - -                        } -#ifdef IPCP_FLOW_STATS -                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - -                        ++dt.stat[dt_pci.eid].rcv_pkt[qc]; -                        dt.stat[dt_pci.eid].rcv_bytes[qc] += len; -                        ++dt.stat[dt_pci.eid].lcl_r_pkt[qc]; -                        dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len; - -                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif +                        fa_np1_rcv(dt_pci.eid, ecn, sdb);                          return;                  } @@ -540,14 +517,6 @@ static void packet_handler(int                  fd,                          log_err("No registered component on eid %d.",                                  dt_pci.eid);                          ipcp_sdb_release(sdb); -#ifdef IPCP_FLOW_STATS -                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); - -                        ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; -                        dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; - -                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); -#endif                          return;                  }  #ifdef IPCP_FLOW_STATS @@ -788,7 +757,7 @@ int dt_reg_comp(void * comp,  int dt_write_packet(uint64_t             dst_addr,                      qoscube_t            qc, -                    int                  np1_fd, +                    uint32_t             eid,                      struct shm_du_buff * sdb)  {          struct dt_pci dt_pci; @@ -803,25 +772,27 @@ int dt_write_packet(uint64_t             dst_addr,          len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);  #ifdef IPCP_FLOW_STATS +        if (eid < PROG_RES_FDS) { +                pthread_mutex_lock(&dt.stat[eid].lock); -        pthread_mutex_lock(&dt.stat[np1_fd].lock); +                ++dt.stat[eid].lcl_r_pkt[qc]; +                dt.stat[eid].lcl_r_bytes[qc] += len; -        ++dt.stat[np1_fd].lcl_r_pkt[qc]; -        dt.stat[np1_fd].lcl_r_bytes[qc] += len; - -        pthread_mutex_unlock(&dt.stat[np1_fd].lock); +                pthread_mutex_unlock(&dt.stat[eid].lock); +        }  #endif -          fd = pff_nhop(dt.pff[qc], dst_addr);          if (fd < 0) {                  log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);  #ifdef IPCP_FLOW_STATS -                pthread_mutex_lock(&dt.stat[np1_fd].lock); +                if (eid < PROG_RES_FDS) { +                        pthread_mutex_lock(&dt.stat[eid].lock); -                ++dt.stat[np1_fd].f_nhp_pkt[qc]; -                dt.stat[np1_fd].f_nhp_bytes[qc] += len; +                        ++dt.stat[eid].lcl_r_pkt[qc]; +                        dt.stat[eid].lcl_r_bytes[qc] += len; -                pthread_mutex_unlock(&dt.stat[np1_fd].lock); +                        pthread_mutex_unlock(&dt.stat[eid].lock); +                }  #endif                  return -EPERM;          } @@ -836,7 +807,7 @@ int dt_write_packet(uint64_t             dst_addr,          dt_pci.dst_addr = dst_addr;          dt_pci.qc       = qc; -        dt_pci.eid      = np1_fd; +        dt_pci.eid      = eid;          dt_pci.ecn      = ca_calc_ecn(fd, len);          dt_pci_ser(head, &dt_pci); @@ -866,7 +837,7 @@ int dt_write_packet(uint64_t             dst_addr,  #ifdef IPCP_FLOW_STATS          pthread_mutex_lock(&dt.stat[fd].lock); -        if (np1_fd < PROG_RES_FDS) { +        if (eid < PROG_RES_FDS) {                  ++dt.stat[fd].lcl_w_pkt[qc];                  dt.stat[fd].lcl_w_bytes[qc] += len;          } diff --git a/src/ipcpd/unicast/dt.h b/src/ipcpd/unicast/dt.h index 73b71a92..3e569dfe 100644 --- a/src/ipcpd/unicast/dt.h +++ b/src/ipcpd/unicast/dt.h @@ -49,7 +49,7 @@ int  dt_reg_comp(void * comp,  int  dt_write_packet(uint64_t             dst_addr,                       qoscube_t            qc, -                     int                  res_fd, +                     uint32_t             eid,                       struct shm_du_buff * sdb);  #endif /* OUROBOROS_IPCPD_UNICAST_DT_H */ diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index 2fc37d95..08c7a930 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -84,7 +84,15 @@ struct cmd {  struct fa_flow {  #ifdef IPCP_FLOW_STATS -        time_t   stamp; +        time_t   stamp;    /* Flow creation                  */ +        size_t   p_snd;    /* Packets sent                   */ +        size_t   p_snd_f;  /* Packets sent fail              */ +        size_t   b_snd;    /* Bytes sent                     */ +        size_t   b_snd_f;  /* Bytes sent fail                */ +        size_t   p_rcv;    /* Packets received               */ +        size_t   p_rcv_f;  /* Packets received fail          */ +        size_t   b_rcv;    /* Bytes received                 */ +        size_t   b_rcv_f;  /* Bytes received fail            */  #endif          int      r_eid;  /* remote endpoint id               */          uint64_t r_addr; /* remote address                   */ @@ -97,7 +105,7 @@ struct {  #ifdef IPCP_FLOW_STATS          size_t           n_flows;  #endif -        int              fd; +        uint32_t         eid;          struct list_head cmds;          pthread_cond_t   cond; @@ -150,8 +158,20 @@ static int fa_stat_read(const char * path,                  "Remote address:                  %20s\n"                  "Local endpoint ID:               %20d\n"                  "Remote endpoint ID:              %20d\n" +                "Sent (packets):                  %20zu\n" +                "Sent (bytes):                    %20zu\n" +                "Send failed (packets):           %20zu\n" +                "Send failed (bytes):             %20zu\n" +                "Received (packets):              %20zu\n" +                "Received (bytes):                %20zu\n" +                "Receive failed (packets):        %20zu\n" +                "Receive failed (bytes):          %20zu\n"                  "%s",                  tmstr, r_addrstr, fd, flow->r_eid, +                flow->p_snd, flow->b_snd, +                flow->p_snd_f, flow->b_snd_f, +                flow->p_rcv, flow->b_rcv, +                flow->b_rcv_f, flow->b_rcv_f,                  castr);          pthread_rwlock_unlock(&fa.flows_lock); @@ -273,6 +293,10 @@ static void packet_handler(int                  fd,          len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#ifdef IPCP_FLOW_STATS +        ++flow->p_snd; +        flow->b_snd += len; +#endif          wnd = ca_ctx_update_snd(flow->ctx, len);          r_addr = flow->r_addr; @@ -285,6 +309,12 @@ static void packet_handler(int                  fd,          if (dt_write_packet(r_addr, qc, r_eid, sdb)) {                  ipcp_sdb_release(sdb);                  log_warn("Failed to forward packet."); +#ifdef IPCP_FLOW_STATS +                pthread_rwlock_wrlock(&fa.flows_lock); +                ++flow->p_snd_f; +                flow->b_snd_f += len; +                pthread_rwlock_unlock(&fa.flows_lock); +#endif                  return;          }  } @@ -530,7 +560,7 @@ int fa_init(void)          list_head_init(&fa.cmds); -        fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); +        fa.eid = dt_reg_comp(&fa, &fa_post_packet, FA);          sprintf(fastr, "%s", FA);          if (rib_reg(fastr, &r_ops)) @@ -653,7 +683,7 @@ int fa_alloc(int             fd,          memcpy(msg + 1, dst, ipcp_dir_hash_len());          memcpy(shm_du_buff_head(sdb) + len, data, dlen); -        if (dt_write_packet(addr, qc, fa.fd, sdb)) { +        if (dt_write_packet(addr, qc, fa.eid, sdb)) {                  ipcp_sdb_release(sdb);                  return -1;          } @@ -729,7 +759,7 @@ int fa_alloc_resp(int          fd,                  psched_add(fa.psched, fd);          } -        if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { +        if (dt_write_packet(flow->r_addr, qc, fa.eid, sdb)) {                  fa_flow_fini(flow);                  pthread_rwlock_unlock(&fa.flows_lock);                  ipcp_sdb_release(sdb); @@ -789,7 +819,7 @@ static int fa_update_remote(int      fd,          pthread_rwlock_unlock(&fa.flows_lock); -        if (dt_write_packet(r_addr, qc, fa.fd, sdb)) { +        if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {                  ipcp_sdb_release(sdb);                  return -1;          } @@ -797,27 +827,51 @@ static int fa_update_remote(int      fd,          return 0;  } -void  fa_ecn_update(int     eid, -                    uint8_t ecn, -                    size_t  len) +void  fa_np1_rcv(uint32_t             eid, +                 uint8_t              ecn, +                 struct shm_du_buff * sdb)  {          struct fa_flow * flow;          bool             update;          uint16_t         ece; +        int              fd; +        size_t           len; + +        fd = (int) eid; +        if (fd < 0) { +                ipcp_sdb_release(sdb); +                return; +        } -        flow = &fa.flows[eid]; +        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); + +        flow = &fa.flows[fd];          pthread_rwlock_wrlock(&fa.flows_lock);          if (flow->r_eid == -1) {                  pthread_rwlock_unlock(&fa.flows_lock); +                ipcp_sdb_release(sdb);                  return;          } - +#ifdef IPCP_FLOW_STATS +        ++flow->p_rcv; +        flow->b_rcv += len; +#endif          update = ca_ctx_update_rcv(flow->ctx, len, ecn, &ece);          pthread_rwlock_unlock(&fa.flows_lock); +        if (ipcp_flow_write(fd, sdb) < 0) { +                ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                pthread_rwlock_wrlock(&fa.flows_lock); +                ++flow->p_rcv_f; +                flow->b_rcv_f += len; +                pthread_rwlock_unlock(&fa.flows_lock); +#endif +        } +          if (update)                  fa_update_remote(eid, ece);  } diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h index daba2a51..c5c1baec 100644 --- a/src/ipcpd/unicast/fa.h +++ b/src/ipcpd/unicast/fa.h @@ -47,8 +47,8 @@ int  fa_alloc_resp(int          fd,  int  fa_dealloc(int fd); -void fa_ecn_update(int     eid, -                   uint8_t ecn, -                   size_t  len); +void fa_np1_rcv(uint32_t             eid, +                uint8_t              ecn, +                struct shm_du_buff * sdb);  #endif /* OUROBOROS_IPCPD_UNICAST_FA_H */  | 
