diff options
Diffstat (limited to 'src/ipcpd/normal')
| -rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 10 | ||||
| -rw-r--r-- | src/ipcpd/normal/dt.c | 407 | 
2 files changed, 397 insertions, 20 deletions
| diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 0b444852..e7e230d2 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -21,6 +21,16 @@ protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto)  math(EXPR PFT_EXPR "1 << 12")  set(PFT_SIZE ${PFT_EXPR} CACHE STRING    "Size of the PDU forwarding table") +if (HAVE_FUSE) +  set(IPCP_FLOW_STATS FALSE CACHE BOOL +    "Enable flow statistics tracking in IPCP") +    if (IPCP_FLOW_STATS) +       message(STATUS "IPCP flow statistics enabled") +    else () +       message(STATUS "IPCP flow statistics disabled") +    endif () +endif () +  set(SOURCE_FILES    # Add source files here diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 0df68dd4..2cfd7417 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -24,13 +24,17 @@  #include "config.h" -#define OUROBOROS_PREFIX "dt" +#define DT               "dt" +#define OUROBOROS_PREFIX DT +/* FIXME: fix #defines and remove endian.h include. */ +#include <ouroboros/endian.h>  #include <ouroboros/bitmap.h>  #include <ouroboros/errno.h>  #include <ouroboros/logs.h>  #include <ouroboros/dev.h>  #include <ouroboros/notifier.h> +#include <ouroboros/rib.h>  #include "connmgr.h"  #include "ipcp.h" @@ -49,6 +53,8 @@  #include <inttypes.h>  #include <assert.h> +#define STAT_FILE_LEN 1590 +  struct comp_info {          void   (* post_sdu)(void * comp, struct shm_du_buff * sdb);          void * comp; @@ -59,7 +65,28 @@ struct {          struct pff *       pff[QOS_CUBE_MAX];          struct routing_i * routing[QOS_CUBE_MAX]; - +#ifdef IPCP_FLOW_STATS +        struct { +                time_t          stamp; +                size_t          snd_pkt[QOS_CUBE_MAX]; +                size_t          rcv_pkt[QOS_CUBE_MAX]; +                size_t          snd_bytes[QOS_CUBE_MAX]; +                size_t          rcv_bytes[QOS_CUBE_MAX]; +                size_t          lcl_r_pkt[QOS_CUBE_MAX]; +                size_t          lcl_r_bytes[QOS_CUBE_MAX]; +                size_t          lcl_w_pkt[QOS_CUBE_MAX]; +                size_t          lcl_w_bytes[QOS_CUBE_MAX]; +                size_t          r_drp_pkt[QOS_CUBE_MAX]; +                size_t          r_drp_bytes[QOS_CUBE_MAX]; +                size_t          w_drp_pkt[QOS_CUBE_MAX]; +                size_t          w_drp_bytes[QOS_CUBE_MAX]; +                size_t          f_nhp_pkt[QOS_CUBE_MAX]; +                size_t          f_nhp_bytes[QOS_CUBE_MAX]; +                pthread_mutex_t lock; +        } stat[PROG_MAX_FLOWS]; + +        size_t             n_flows; +#endif          struct bmp *       res_fds;          struct comp_info   comps[PROG_RES_FDS];          pthread_rwlock_t   lock; @@ -67,6 +94,196 @@ struct {          pthread_t          listener;  } dt; +static int dt_stat_read(const char * path, +                        char *       buf, +                        size_t       len) +{ +#ifdef IPCP_FLOW_STATS +        int  fd; +        int  i; +        char str[587]; + +        /* NOTE: we may need stronger checks. */ +        fd = atoi(path); + +        if (len < STAT_FILE_LEN) +                return 0; + +        buf[0] = '\0'; + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        if (dt.stat[fd].stamp == 0) { +                pthread_mutex_unlock(&dt.stat[fd].lock); +                return 0; +        } + +        for (i = 0; i < QOS_CUBE_MAX; ++i) { +                sprintf(str, +                        "Qos cube %d:\n" +                        " sent (packets):          %10zu\n" +                        " rcvd (packets):          %10zu\n" +                        " sent (bytes):            %10zu\n" +                        " rcvd (bytes):            %10zu\n" +                        " local sent (packets):    %10zu\n" +                        " local sent (bytes):      %10zu\n" +                        " local rcvd (packets):    %10zu\n" +                        " local rcvd (bytes):      %10zu\n" +                        " dropped ttl (packets):   %10zu\n" +                        " dropped ttl (bytes):     %10zu\n" +                        " failed writes (packets): %10zu\n" +                        " failed writes (bytes):   %10zu\n" +                        " failed nhop (packets):   %10zu\n" +                        " failed nhop (bytes):     %10zu\n", +                        i, +                        dt.stat[fd].snd_pkt[i], +                        dt.stat[fd].rcv_pkt[i], +                        dt.stat[fd].snd_bytes[i], +                        dt.stat[fd].rcv_bytes[i], +                        dt.stat[fd].lcl_w_pkt[i], +                        dt.stat[fd].lcl_w_bytes[i], +                        dt.stat[fd].lcl_r_pkt[i], +                        dt.stat[fd].lcl_r_bytes[i], +                        dt.stat[fd].r_drp_pkt[i], +                        dt.stat[fd].r_drp_bytes[i], +                        dt.stat[fd].w_drp_pkt[i], +                        dt.stat[fd].w_drp_bytes[i], +                        dt.stat[fd].f_nhp_pkt[i], +                        dt.stat[fd].f_nhp_bytes[i] +                        ); +                strcat(buf, str); +        } + +        pthread_mutex_unlock(&dt.stat[fd].lock); + +        return STAT_FILE_LEN; +#else +        (void) path; +        (void) buf; +        (void) len; +        return 0; +#endif +} + +static int dt_stat_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS +        char   entry[RIB_PATH_LEN + 1]; +        size_t i; +        int    idx = 0; + +        pthread_rwlock_rdlock(&dt.lock); + +        if (dt.n_flows < 1) { +                pthread_rwlock_unlock(&dt.lock); +                return 0; +        } + +        *buf = malloc(sizeof(**buf) * dt.n_flows); +        if (*buf == NULL) { +                pthread_rwlock_unlock(&dt.lock); +                return -ENOMEM; +        } + +        for (i = 0; i < PROG_MAX_FLOWS; ++i) { +                pthread_mutex_lock(&dt.stat[i].lock); + +                if (dt.stat[i].stamp == 0) { +                        pthread_mutex_unlock(&dt.stat[i].lock); +                        /* Optimization: skip unused res_fds. */ +                        if (i < PROG_RES_FDS) +                                i = PROG_RES_FDS; +                        continue; +                } + +                sprintf(entry, "%zu", i); + +                (*buf)[idx] = malloc(strlen(entry) + 1); +                if ((*buf)[idx] == NULL) { +                        while (idx-- > 0) +                                free((*buf)[idx]); +                        free(buf); +                        pthread_mutex_unlock(&dt.stat[i].lock); +                        pthread_rwlock_unlock(&dt.lock); +                        return -ENOMEM; +                } + +                strcpy((*buf)[idx++], entry); + +                pthread_mutex_unlock(&dt.stat[i].lock); +        } + +        pthread_rwlock_unlock(&dt.lock); + +        assert((size_t) idx == dt.n_flows); + +        return idx; +#else +        (void) buf; +        return 0; +#endif +} + +static int dt_stat_getattr(const char *  path, +                           struct stat * st) +{ +#ifdef IPCP_FLOW_STATS +        int fd; + +        fd = atoi(path); + +        st->st_mode  = S_IFREG | 0755; +        st->st_nlink = 1; +        st->st_uid   = getuid(); +        st->st_gid   = getgid(); + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        if (dt.stat[fd].stamp != -1) { +                st->st_size  = STAT_FILE_LEN; +                st->st_mtime = dt.stat[fd].stamp; +        } else { +                st->st_size  = 0; +                st->st_mtime = 0; +        } + +        pthread_mutex_unlock(&dt.stat[fd].lock); +#else +        (void) path; +        (void) st; +#endif +        return 0; +} + +static struct rib_ops r_ops = { +        .read    = dt_stat_read, +        .readdir = dt_stat_readdir, +        .getattr = dt_stat_getattr +}; + +#ifdef IPCP_FLOW_STATS + +static void set_used(int  fd, +                     bool b) +{ +        struct timespec now; + +        clock_gettime(CLOCK_REALTIME_COARSE, &now); + +        pthread_mutex_lock(&dt.stat[fd].lock); + +        dt.stat[fd].stamp = b ? now.tv_sec : 0; + +        pthread_mutex_unlock(&dt.stat[fd].lock); + +        pthread_rwlock_wrlock(&dt.lock); + +        b ? ++dt.n_flows : --dt.n_flows; + +        pthread_rwlock_unlock(&dt.lock); +} +#endif +  static void handle_event(void *       self,                           int          event,                           const void * o) @@ -79,10 +296,16 @@ static void handle_event(void *       self,          switch (event) {          case NOTIFY_DT_CONN_ADD: +#ifdef IPCP_FLOW_STATS +                set_used(c->flow_info.fd, true); +#endif                  sdu_sched_add(dt.sdu_sched, c->flow_info.fd);                  log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd);                  break;          case NOTIFY_DT_CONN_DEL: +#ifdef IPCP_FLOW_STATS +                set_used(c->flow_info.fd, false); +#endif                  sdu_sched_del(dt.sdu_sched, c->flow_info.fd);                  log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd);                  break; @@ -97,39 +320,112 @@ static void sdu_handler(int                  fd,  {          struct dt_pci dt_pci;          int           ret; - +        int           ofd; +#ifdef IPCP_FLOW_STATS +        size_t        len; +#else +        (void)        fd; +#endif          memset(&dt_pci, 0, sizeof(dt_pci));          dt_pci_des(sdb, &dt_pci); - +#ifdef IPCP_FLOW_STATS +        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif          if (dt_pci.dst_addr != ipcpi.dt_addr) {                  if (dt_pci.ttl == 0) {                          log_dbg("TTL was zero.");                          ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].r_drp_pkt[qc]; +                        dt.stat[fd].r_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif                          return;                  } -                fd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); -                if (fd < 0) { -                        log_err("No next hop for %" PRIu64, dt_pci.dst_addr); +                /* FIXME: Use qoscube from PCI instead of incoming flow. */ +                ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); +                if (ofd < 0) { +                        log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);                          ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; +                        ++dt.stat[fd].f_nhp_pkt[qc]; +                        dt.stat[fd].f_nhp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif                          return;                  } -                ret = ipcp_flow_write(fd, sdb); +                ret = ipcp_flow_write(ofd, sdb);                  if (ret < 0) { -                        log_err("Failed to write SDU to fd %d.", fd); +                        log_dbg("Failed to write SDU to fd %d.", ofd);                          if (ret == -EFLOWDOWN) -                                notifier_event(NOTIFY_DT_CONN_DOWN, &fd); +                                notifier_event(NOTIFY_DT_CONN_DOWN, &ofd);                          ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[fd].lock); + +                        ++dt.stat[fd].rcv_pkt[qc]; +                        dt.stat[fd].rcv_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[fd].lock); +                        pthread_mutex_lock(&dt.stat[ofd].lock); + +                        ++dt.stat[ofd].w_drp_pkt[qc]; +                        dt.stat[ofd].w_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif                          return;                  } +#ifdef IPCP_FLOW_STATS +                pthread_mutex_lock(&dt.stat[fd].lock); + +                ++dt.stat[fd].rcv_pkt[qc]; +                dt.stat[fd].rcv_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[fd].lock); +                pthread_mutex_lock(&dt.stat[ofd].lock); + +                ++dt.stat[ofd].snd_pkt[qc]; +                dt.stat[ofd].snd_bytes[qc] += len; + +                pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif          } else {                  dt_pci_shrink(sdb); -                  if (dt_pci.eid > PROG_RES_FDS) { -                        if (ipcp_flow_write(dt_pci.eid, sdb)) +                        if (ipcp_flow_write(dt_pci.eid, sdb)) {                                  ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                                pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                                ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; +                                dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; + +                                pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif + +                        } +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                        ++dt.stat[dt_pci.eid].rcv_pkt[qc]; +                        dt.stat[dt_pci.eid].rcv_bytes[qc] += len; +                        ++dt.stat[dt_pci.eid].lcl_r_pkt[qc]; +                        dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif                          return;                  } @@ -137,9 +433,26 @@ static void sdu_handler(int                  fd,                          log_err("No registered component on eid %d.",                                  dt_pci.eid);                          ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS +                        pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                        ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; +                        dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; + +                        pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif                          return;                  } +#ifdef IPCP_FLOW_STATS +                pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + +                ++dt.stat[fd].rcv_pkt[qc]; +                dt.stat[fd].rcv_bytes[qc] += len; +                ++dt.stat[fd].lcl_r_pkt[qc]; +                dt.stat[fd].lcl_r_bytes[qc] += len; +                pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif                  dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb);          }  } @@ -229,9 +542,30 @@ int dt_init(enum pol_routing pr,          dt.res_fds = bmp_create(PROG_RES_FDS, 0);          if (dt.res_fds == NULL)                  goto fail_res_fds; +#ifdef IPCP_FLOW_STAT +        memset(dt.stat, 0, sizeof(dt.stat)); + +        for (i = 0; i < PROG_MAX_FLOWS, ++i) +                if (pthread_mutex_init(&dt.stat[fd].lock, NULL)) { +                        for (j = 0; j < i; ++j) +                                pthread_mutex_destroy(&dt.stat[j].lock); +                        goto fail_stat_lock; +                } + +        dt.n_flows = 0; +#endif +        if (rib_reg(DT, &r_ops)) +                goto fail_rib_reg;          return 0; + fail_rib_reg: +#ifdef IPCP_FLOW_STAT +        for (i = 0; i < PROG_MAX_FLOWS; ++i) +                pthread_mutex_destroy(&dt.stat[i].lock); + fail_stat_lock: +#endif +        bmp_destroy(dt.res_fds);   fail_res_fds:          pthread_rwlock_destroy(&dt.lock);   fail_rwlock_init: @@ -256,6 +590,11 @@ void dt_fini(void)  {          int i; +        rib_unreg(DT); +#ifdef IPCP_FLOW_STATS +        for (i = 0; i < PROG_MAX_FLOWS; ++i) +                pthread_mutex_destroy(&dt.stat[i].lock); +#endif          bmp_destroy(dt.res_fds);          pthread_rwlock_destroy(&dt.lock); @@ -322,7 +661,9 @@ int dt_reg_comp(void * comp,          dt.comps[res_fd].comp     = comp;          pthread_rwlock_unlock(&dt.lock); - +#ifdef IPCP_FLOW_STATS +        set_used(res_fd, true); +#endif          return res_fd;  } @@ -334,14 +675,18 @@ int dt_write_sdu(uint64_t             dst_addr,          int           fd;          struct dt_pci dt_pci;          int           ret; - +#ifdef IPCP_FLOW_STATS +        size_t        len; +#endif          assert(sdb);          assert(dst_addr != ipcpi.dt_addr); - +#ifdef IPCP_FLOW_STATS +        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif          fd = pff_nhop(dt.pff[qc], dst_addr);          if (fd < 0) {                  log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); -                return -1; +                goto fail_write;          }          dt_pci.dst_addr = dst_addr; @@ -349,17 +694,39 @@ int dt_write_sdu(uint64_t             dst_addr,          dt_pci.eid      = np1_fd;          if (dt_pci_ser(sdb, &dt_pci)) { -                log_err("Failed to serialize PDU."); -                return -1; +                log_dbg("Failed to serialize PDU."); +                goto fail_write;          }          ret = ipcp_flow_write(fd, sdb);          if (ret < 0) { -                log_err("Failed to write SDU to fd %d.", fd); +                log_dbg("Failed to write SDU to fd %d.", fd);                  if (ret == -EFLOWDOWN)                          notifier_event(NOTIFY_DT_CONN_DOWN, &fd); -                return -1; +                goto fail_write;          } +#ifdef IPCP_FLOW_STATS +        pthread_mutex_lock(&dt.stat[fd].lock); + +        ++dt.stat[dt_pci.eid].lcl_w_pkt[qc]; +        dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len; +        ++dt.stat[fd].snd_pkt[qc]; +        dt.stat[fd].snd_bytes[qc] = len; +        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif          return 0; + + fail_write: +#ifdef IPCP_FLOW_STATS +        pthread_mutex_lock(&dt.stat[fd].lock); + +        ++dt.stat[dt_pci.eid].lcl_w_pkt[qc]; +        dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len; +        ++dt.stat[fd].w_drp_pkt[qc]; +        dt.stat[fd].w_drp_bytes[qc] = len; + +        pthread_mutex_unlock(&dt.stat[fd].lock); +#endif +        return -1;  } | 
