diff options
| author | Dimitri Staessens <dimitri.staessens@ugent.be> | 2018-02-20 08:15:13 +0100 | 
|---|---|---|
| committer | Sander Vrijders <sander.vrijders@ugent.be> | 2018-02-20 08:49:56 +0100 | 
| commit | cc4b333f5a6964120f2e40c33f67d0be7dd409fc (patch) | |
| tree | d91399337554d3877c1f912f44f2b0c5a45c671b /src/ipcpd/normal | |
| parent | e72fcd924b25b2b3b8a45c85d9c3d09388885249 (diff) | |
| download | ouroboros-cc4b333f5a6964120f2e40c33f67d0be7dd409fc.tar.gz ouroboros-cc4b333f5a6964120f2e40c33f67d0be7dd409fc.zip | |
ipcpd: Export flow statistics to RIB
This adds flow statistics for the data transfer (DT) component to the
RIB. The DT component will keep track of the traffic on each
flow. This feature can be enabled or disabled by setting the
IPCP_FLOW_STATS variable in the build system.
Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be>
Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 10 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.c | 407 | 
2 files changed, 397 insertions, 20 deletions
| diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 0b444852..e7e230d2 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -21,6 +21,16 @@ protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto)  math(EXPR PFT_EXPR "1 << 12")  set(PFT_SIZE ${PFT_EXPR} CACHE STRING    "Size of the PDU forwarding table") +if (HAVE_FUSE) +  set(IPCP_FLOW_STATS FALSE CACHE BOOL +    "Enable flow statistics tracking in IPCP") +    if (IPCP_FLOW_STATS) +       message(STATUS "IPCP flow statistics enabled") +    else () +       message(STATUS "IPCP flow statistics disabled") +    endif () +endif () +  set(SOURCE_FILES    # Add source files here diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 0df68dd4..2cfd7417 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -24,13 +24,17 @@  #include "config.h" -#define OUROBOROS_PREFIX "dt" +#define DT               "dt" +#define OUROBOROS_PREFIX DT +/* FIXME: fix #defines and remove endian.h include. */ +#include <ouroboros/endian.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/errno.h>  #include <ouroboros/logs.h>  #include <ouroboros/dev.h>  #include <ouroboros/notifier.h> +#include <ouroboros/rib.h>  #include "connmgr.h"  #include "ipcp.h" @@ -49,6 +53,8 @@  #include <inttypes.h>  #include <assert.h> +#define STAT_FILE_LEN 1590 +  struct comp_info {          void   (* post_sdu)(void * comp, struct shm_du_buff * sdb);          void * comp; @@ -59,7 +65,28 @@ struct {          struct pff *       pff[QOS_CUBE_MAX];          struct routing_i * routing[QOS_CUBE_MAX]; - +#ifdef IPCP_FLOW_STATS +        struct { +                time_t          stamp; +                size_t          snd_pkt[QOS_CUBE_MAX]; +                size_t          rcv_pkt[QOS_CUBE_MAX]; +                size_t          snd_bytes[QOS_CUBE_MAX]; +                size_t          rcv_bytes[QOS_CUBE_MAX]; +                size_t          lcl_r_pkt[QOS_CUBE_MAX]; +                size_t          lcl_r_bytes[QOS_CUBE_MAX]; +                size_t          lcl_w_pkt[QOS_CUBE_MAX]; +                size_t          lcl_w_bytes[QOS_CUBE_MAX]; +                size_t          r_drp_pkt[QOS_CUBE_MAX]; +                size_t          r_drp_bytes[QOS_CUBE_MAX]; +                size_t          w_drp_pkt[QOS_CUBE_MAX]; +                size_t          w_drp_bytes[QOS_CUBE_MAX]; +                size_t          f_nhp_pkt[QOS_CUBE_MAX]; +                size_t          f_nhp_bytes[QOS_CUBE_MAX]; +                pthread_mutex_t lock; +        } stat[PROG_MAX_FLOWS]; + +        size_t             n_flows; +#endif          struct bmp *       res_fds;          struct comp_info   comps[PROG_RES_FDS];          pthread_rwlock_t   lock; @@ -67,6 +94,196 @@ struct {          pthread_t          listener;  } dt; +static int dt_stat_read(const char * path, +                        char *       buf, +                        size_t       len) +{ +#ifdef IPCP_FLOW_STATS +        int  fd; +        int  i; +        char str[587]; + +        /* NOTE: we may need stronger checks. */ +        fd = atoi(path); + +        if (len < STAT_FILE_LEN) +                return 0; + +        buf[0] = '\0'; + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        if (dt.stat[fd].stamp == 0) { +                pthread_mutex_unlock(&dt.stat[fd].lock); +                return 0; +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                sprintf(str, +                        "Qos cube %d:\n" +                        " sent (packets):          %10zu\n" +                        " rcvd (packets):          %10zu\n" +                        " sent (bytes):            %10zu\n" +                        " rcvd (bytes):            %10zu\n" +                        " local sent (packets):    %10zu\n" +                        " local sent (bytes):      %10zu\n" +                        " local rcvd (packets):    %10zu\n" +                        " local rcvd (bytes):      %10zu\n" +                        " dropped ttl (packets):   %10zu\n" +                        " dropped ttl (bytes):     %10zu\n" +                        " failed writes (packets): %10zu\n" +                        " failed writes (bytes):   %10zu\n" +                        " failed nhop (packets):   %10zu\n" +                        " failed nhop (bytes):     %10zu\n", +                        i, +                        dt.stat[fd].snd_pkt[i], +                        dt.stat[fd].rcv_pkt[i], +                        dt.stat[fd].snd_bytes[i], +                        dt.stat[fd].rcv_bytes[i], +                        dt.stat[fd].lcl_w_pkt[i], +                        dt.stat[fd].lcl_w_bytes[i], +                        dt.stat[fd].lcl_r_pkt[i], +                        dt.stat[fd].lcl_r_bytes[i], +                        dt.stat[fd].r_drp_pkt[i], +                        dt.stat[fd].r_drp_bytes[i], +                        dt.stat[fd].w_drp_pkt[i], +                        dt.stat[fd].w_drp_bytes[i], +                        dt.stat[fd].f_nhp_pkt[i], +                        dt.stat[fd].f_nhp_bytes[i] +                        ); +                strcat(buf, str); +        } + +        pthread_mutex_unlock(&dt.stat[fd].lock); + +        return STAT_FILE_LEN; +#else +        (void) path; +        (void) buf; +        (void) len; +        return 0; +#endif +} + +static int dt_stat_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS +        char   entry[RIB_PATH_LEN + 1]; +        size_t i; +        int    idx = 0; + +        pthread_rwlock_rdlock(&dt.lock); + +        if (dt.n_flows < 1) { +                pthread_rwlock_unlock(&dt.lock); +                return 0; +        } + +        *buf = malloc(sizeof(**buf) * dt.n_flows); +        if (*buf == NULL) { +                pthread_rwlock_unlock(&dt.lock); +                return -ENOMEM; +        } + +        for (i = 0; i < PROG_MAX_FLOWS; ++i) { +                pthread_mutex_lock(&dt.stat[i].lock); + +                if (dt.stat[i].stamp == 0) { +                        pthread_mutex_unlock(&dt.stat[i].lock); +                        /* Optimization: skip unused res_fds. */ +                        if (i < PROG_RES_FDS) +                                i = PROG_RES_FDS; +                        continue; +                } + +                sprintf(entry, "%zu", i); + +                (*buf)[idx] = malloc(strlen(entry) + 1); +                if ((*buf)[idx] == NULL) { +                        while (idx-- > 0) +                                free((*buf)[idx]); +                        free(buf); +                        pthread_mutex_unlock(&dt.stat[i].lock); +                        pthread_rwlock_unlock(&dt.lock); +                        return -ENOMEM; +                } + +                strcpy((*buf)[idx++], entry); + +                pthread_mutex_unlock(&dt.stat[i].lock); +        } + +        pthread_rwlock_unlock(&dt.lock); + +        assert((size_t) idx == dt.n_flows); + +        return idx; +#else +        (void) buf; +        return 0; +#endif +} + +static int dt_stat_getattr(const char *  path, +                           struct stat * st) +{ +#ifdef IPCP_FLOW_STATS +        int fd; + +        fd = atoi(path); + +        st->st_mode  = S_IFREG | 0755; +        st->st_nlink = 1; +        st->st_uid   = getuid(); +        st->st_gid   = getgid(); + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        if (dt.stat[fd].stamp != -1) { +                st->st_size  = STAT_FILE_LEN; +                st->st_mtime = dt.stat[fd].stamp; +        } else { +                st->st_size  = 0; +                st->st_mtime = 0; +        } + +        pthread_mutex_unlock(&dt.stat[fd].lock); +#else +        (void) path; +        (void) st; +#endif +        return 0; +} + +static struct rib_ops r_ops = { +        .read    = dt_stat_read, +        .readdir = dt_stat_readdir, +        .getattr = dt_stat_getattr +}; + +#ifdef IPCP_FLOW_STATS + +static void set_used(int  fd, +                     bool b) +{ +        struct timespec now; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        dt.stat[fd].stamp = b ? now.tv_sec : 0; + +        pthread_mutex_unlock(&dt.stat[fd].lock); + +        pthread_rwlock_wrlock(&dt.lock); + +        b ? ++dt.n_flows : --dt.n_flows; + +        pthread_rwlock_unlock(&dt.lock); +} +#endif +  static void handle_event(void *       self,                           int          event,                           const void * o) @@ -79,10 +296,16 @@ static void handle_event(void *       self,          switch (event) {          case NOTIFY_DT_CONN_ADD: +#ifdef IPCP_FLOW_STATS +                set_used(c->flow_info.fd, true); +#endif                  sdu_sched_add(dt.sdu_sched, c->flow_info.fd);                  log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd);                  break;          case NOTIFY_DT_CONN_DEL: +#ifdef IPCP_FLOW_STATS +                set_used(c->flow_info.fd, false); +#endif                  sdu_sched_del(dt.sdu_sched, c->flow_info.fd);                  log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd);                  break; @@ -97,39 +320,112 @@ static void sdu_handler(int                  fd,  {          struct dt_pci dt_pci;          int           ret; - +        int           ofd; +#ifdef IPCP_FLOW_STATS +        size_t        len; +#else +        (void)        fd; +#endif          memset(&dt_pci, 0, sizeof(dt_pci));          dt_pci_des(sdb, &dt_pci); - +#ifdef IPCP_FLOW_STATS +        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif          if (dt_pci.dst_addr != ipcpi.dt_addr) {                  if (dt_pci.ttl == 0) {                          log_dbg("TTL was zero.");                          ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].r_drp_pkt[qc]; +                        dt.stat[fd].r_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif                          return;                  } -                fd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); -                if (fd < 0) { -                        log_err("No next hop for %" PRIu64, dt_pci.dst_addr); +                /* FIXME: Use qoscube from PCI instead of incoming flow. */ +                ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); +                if (ofd < 0) { +                        log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);                          ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; +                        ++dt.stat[fd].f_nhp_pkt[qc]; +                        dt.stat[fd].f_nhp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif                          return;                  } -                ret = ipcp_flow_write(fd, sdb); +                ret = ipcp_flow_write(ofd, sdb);                  if (ret < 0) { -                        log_err("Failed to write SDU to fd %d.", fd); +                        log_dbg("Failed to write SDU to fd %d.", ofd);                          if (ret == -EFLOWDOWN) -                                notifier_event(NOTIFY_DT_CONN_DOWN, &fd); +                                notifier_event(NOTIFY_DT_CONN_DOWN, &ofd);                          ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +                        pthread_mutex_lock(&dt.stat[ofd].lock); + +                        ++dt.stat[ofd].w_drp_pkt[qc]; +                        dt.stat[ofd].w_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif                          return;                  } +#ifdef IPCP_FLOW_STATS +                pthread_mutex_lock(&dt.stat[fd].lock); + +                ++dt.stat[fd].rcv_pkt[qc]; +                dt.stat[fd].rcv_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[fd].lock); +                pthread_mutex_lock(&dt.stat[ofd].lock); + +                ++dt.stat[ofd].snd_pkt[qc]; +                dt.stat[ofd].snd_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif          } else {                  dt_pci_shrink(sdb); -                  if (dt_pci.eid > PROG_RES_FDS) { -                        if (ipcp_flow_write(dt_pci.eid, sdb)) +                        if (ipcp_flow_write(dt_pci.eid, sdb)) {                                  ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                                pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                                ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; +                                dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; + +                                pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif + +                        } +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                        ++dt.stat[dt_pci.eid].rcv_pkt[qc]; +                        dt.stat[dt_pci.eid].rcv_bytes[qc] += len; +                        ++dt.stat[dt_pci.eid].lcl_r_pkt[qc]; +                        dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif                          return;                  } @@ -137,9 +433,26 @@ static void sdu_handler(int                  fd,                          log_err("No registered component on eid %d.",                                  dt_pci.eid);                          ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                        ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; +                        dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif                          return;                  } +#ifdef IPCP_FLOW_STATS +                pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                ++dt.stat[fd].rcv_pkt[qc]; +                dt.stat[fd].rcv_bytes[qc] += len; +                ++dt.stat[fd].lcl_r_pkt[qc]; +                dt.stat[fd].lcl_r_bytes[qc] += len; +                pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif                  dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb);          }  } @@ -229,9 +542,30 @@ int dt_init(enum pol_routing pr,          dt.res_fds = bmp_create(PROG_RES_FDS, 0);          if (dt.res_fds == NULL)                  goto fail_res_fds; +#ifdef IPCP_FLOW_STAT +        memset(dt.stat, 0, sizeof(dt.stat)); + +        for (i = 0; i < PROG_MAX_FLOWS, ++i) +                if (pthread_mutex_init(&dt.stat[fd].lock, NULL)) { +                        for (j = 0; j < i; ++j) +                                pthread_mutex_destroy(&dt.stat[j].lock); +                        goto fail_stat_lock; +                } + +        dt.n_flows = 0; +#endif +        if (rib_reg(DT, &r_ops)) +                goto fail_rib_reg;          return 0; + fail_rib_reg: +#ifdef IPCP_FLOW_STAT +        for (i = 0; i < PROG_MAX_FLOWS; ++i) +                pthread_mutex_destroy(&dt.stat[i].lock); + fail_stat_lock: +#endif +        bmp_destroy(dt.res_fds);   fail_res_fds:          pthread_rwlock_destroy(&dt.lock);   fail_rwlock_init: @@ -256,6 +590,11 @@ void dt_fini(void)  {          int i; +        rib_unreg(DT); +#ifdef IPCP_FLOW_STATS +        for (i = 0; i < PROG_MAX_FLOWS; ++i) +                pthread_mutex_destroy(&dt.stat[i].lock); +#endif          bmp_destroy(dt.res_fds);          pthread_rwlock_destroy(&dt.lock); @@ -322,7 +661,9 @@ int dt_reg_comp(void * comp,          dt.comps[res_fd].comp     = comp;          pthread_rwlock_unlock(&dt.lock); - +#ifdef IPCP_FLOW_STATS +        set_used(res_fd, true); +#endif          return res_fd;  } @@ -334,14 +675,18 @@ int dt_write_sdu(uint64_t             dst_addr,          int           fd;          struct dt_pci dt_pci;          int           ret; - +#ifdef IPCP_FLOW_STATS +        size_t        len; +#endif          assert(sdb);          assert(dst_addr != ipcpi.dt_addr); - +#ifdef IPCP_FLOW_STATS +        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif          fd = pff_nhop(dt.pff[qc], dst_addr);          if (fd < 0) {                  log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); -                return -1; +                goto fail_write;          }          dt_pci.dst_addr = dst_addr; @@ -349,17 +694,39 @@ int dt_write_sdu(uint64_t             dst_addr,          dt_pci.eid      = np1_fd;          if (dt_pci_ser(sdb, &dt_pci)) { -                log_err("Failed to serialize PDU."); -                return -1; +                log_dbg("Failed to serialize PDU."); +                goto fail_write;          }          ret = ipcp_flow_write(fd, sdb);          if (ret < 0) { -                log_err("Failed to write SDU to fd %d.", fd); +                log_dbg("Failed to write SDU to fd %d.", fd);                  if (ret == -EFLOWDOWN)                          notifier_event(NOTIFY_DT_CONN_DOWN, &fd); -                return -1; +                goto fail_write;          } +#ifdef IPCP_FLOW_STATS +        pthread_mutex_lock(&dt.stat[fd].lock); + +        ++dt.stat[dt_pci.eid].lcl_w_pkt[qc]; +        dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len; +        ++dt.stat[fd].snd_pkt[qc]; +        dt.stat[fd].snd_bytes[qc] = len; +        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif          return 0; + + fail_write: +#ifdef IPCP_FLOW_STATS +        pthread_mutex_lock(&dt.stat[fd].lock); + +        ++dt.stat[dt_pci.eid].lcl_w_pkt[qc]; +        dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len; +        ++dt.stat[fd].w_drp_pkt[qc]; +        dt.stat[fd].w_drp_bytes[qc] = len; + +        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif +        return -1;  } | 
