diff options
Diffstat (limited to 'src/ipcpd')
-rw-r--r-- | src/ipcpd/config.h.in | 2 | ||||
-rw-r--r-- | src/ipcpd/normal/CMakeLists.txt | 10 | ||||
-rw-r--r-- | src/ipcpd/normal/dt.c | 407 |
3 files changed, 399 insertions, 20 deletions
diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index 3fa74457..a4893f50 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -45,6 +45,8 @@ #define IPCP_SCHED_THR_MUL @IPCP_SCHED_THR_MUL@ #define PFT_SIZE @PFT_SIZE@ +#cmakedefine IPCP_FLOW_STATS + /* udp */ #cmakedefine HAVE_DDNS #define NSUPDATE_EXEC "@NSUPDATE_EXECUTABLE@" diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt index 0b444852..e7e230d2 100644 --- a/src/ipcpd/normal/CMakeLists.txt +++ b/src/ipcpd/normal/CMakeLists.txt @@ -21,6 +21,16 @@ protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto) math(EXPR PFT_EXPR "1 << 12") set(PFT_SIZE ${PFT_EXPR} CACHE STRING "Size of the PDU forwarding table") +if (HAVE_FUSE) + set(IPCP_FLOW_STATS FALSE CACHE BOOL + "Enable flow statistics tracking in IPCP") + if (IPCP_FLOW_STATS) + message(STATUS "IPCP flow statistics enabled") + else () + message(STATUS "IPCP flow statistics disabled") + endif () +endif () + set(SOURCE_FILES # Add source files here diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c index 0df68dd4..2cfd7417 100644 --- a/src/ipcpd/normal/dt.c +++ b/src/ipcpd/normal/dt.c @@ -24,13 +24,17 @@ #include "config.h" -#define OUROBOROS_PREFIX "dt" +#define DT "dt" +#define OUROBOROS_PREFIX DT +/* FIXME: fix #defines and remove endian.h include. */ +#include <ouroboros/endian.h> #include <ouroboros/bitmap.h> #include <ouroboros/errno.h> #include <ouroboros/logs.h> #include <ouroboros/dev.h> #include <ouroboros/notifier.h> +#include <ouroboros/rib.h> #include "connmgr.h" #include "ipcp.h" @@ -49,6 +53,8 @@ #include <inttypes.h> #include <assert.h> +#define STAT_FILE_LEN 1590 + struct comp_info { void (* post_sdu)(void * comp, struct shm_du_buff * sdb); void * comp; @@ -59,7 +65,28 @@ struct { struct pff * pff[QOS_CUBE_MAX]; struct routing_i * routing[QOS_CUBE_MAX]; - +#ifdef IPCP_FLOW_STATS + struct { + time_t stamp; + size_t snd_pkt[QOS_CUBE_MAX]; + size_t rcv_pkt[QOS_CUBE_MAX]; + size_t snd_bytes[QOS_CUBE_MAX]; + size_t rcv_bytes[QOS_CUBE_MAX]; + size_t lcl_r_pkt[QOS_CUBE_MAX]; + size_t lcl_r_bytes[QOS_CUBE_MAX]; + size_t lcl_w_pkt[QOS_CUBE_MAX]; + size_t lcl_w_bytes[QOS_CUBE_MAX]; + size_t r_drp_pkt[QOS_CUBE_MAX]; + size_t r_drp_bytes[QOS_CUBE_MAX]; + size_t w_drp_pkt[QOS_CUBE_MAX]; + size_t w_drp_bytes[QOS_CUBE_MAX]; + size_t f_nhp_pkt[QOS_CUBE_MAX]; + size_t f_nhp_bytes[QOS_CUBE_MAX]; + pthread_mutex_t lock; + } stat[PROG_MAX_FLOWS]; + + size_t n_flows; +#endif struct bmp * res_fds; struct comp_info comps[PROG_RES_FDS]; pthread_rwlock_t lock; @@ -67,6 +94,196 @@ struct { pthread_t listener; } dt; +static int dt_stat_read(const char * path, + char * buf, + size_t len) +{ +#ifdef IPCP_FLOW_STATS + int fd; + int i; + char str[587]; + + /* NOTE: we may need stronger checks. */ + fd = atoi(path); + + if (len < STAT_FILE_LEN) + return 0; + + buf[0] = '\0'; + + pthread_mutex_lock(&dt.stat[fd].lock); + + if (dt.stat[fd].stamp == 0) { + pthread_mutex_unlock(&dt.stat[fd].lock); + return 0; + } + + for (i = 0; i < QOS_CUBE_MAX; ++i) { + sprintf(str, + "Qos cube %d:\n" + " sent (packets): %10zu\n" + " rcvd (packets): %10zu\n" + " sent (bytes): %10zu\n" + " rcvd (bytes): %10zu\n" + " local sent (packets): %10zu\n" + " local sent (bytes): %10zu\n" + " local rcvd (packets): %10zu\n" + " local rcvd (bytes): %10zu\n" + " dropped ttl (packets): %10zu\n" + " dropped ttl (bytes): %10zu\n" + " failed writes (packets): %10zu\n" + " failed writes (bytes): %10zu\n" + " failed nhop (packets): %10zu\n" + " failed nhop (bytes): %10zu\n", + i, + dt.stat[fd].snd_pkt[i], + dt.stat[fd].rcv_pkt[i], + dt.stat[fd].snd_bytes[i], + dt.stat[fd].rcv_bytes[i], + dt.stat[fd].lcl_w_pkt[i], + dt.stat[fd].lcl_w_bytes[i], + dt.stat[fd].lcl_r_pkt[i], + dt.stat[fd].lcl_r_bytes[i], + dt.stat[fd].r_drp_pkt[i], + dt.stat[fd].r_drp_bytes[i], + dt.stat[fd].w_drp_pkt[i], + dt.stat[fd].w_drp_bytes[i], + dt.stat[fd].f_nhp_pkt[i], + dt.stat[fd].f_nhp_bytes[i] + ); + strcat(buf, str); + } + + pthread_mutex_unlock(&dt.stat[fd].lock); + + return STAT_FILE_LEN; +#else + (void) path; + (void) buf; + (void) len; + return 0; +#endif +} + +static int dt_stat_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS + char entry[RIB_PATH_LEN + 1]; + size_t i; + int idx = 0; + + pthread_rwlock_rdlock(&dt.lock); + + if (dt.n_flows < 1) { + pthread_rwlock_unlock(&dt.lock); + return 0; + } + + *buf = malloc(sizeof(**buf) * dt.n_flows); + if (*buf == NULL) { + pthread_rwlock_unlock(&dt.lock); + return -ENOMEM; + } + + for (i = 0; i < PROG_MAX_FLOWS; ++i) { + pthread_mutex_lock(&dt.stat[i].lock); + + if (dt.stat[i].stamp == 0) { + pthread_mutex_unlock(&dt.stat[i].lock); + /* Optimization: skip unused res_fds. */ + if (i < PROG_RES_FDS) + i = PROG_RES_FDS; + continue; + } + + sprintf(entry, "%zu", i); + + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) { + while (idx-- > 0) + free((*buf)[idx]); + free(buf); + pthread_mutex_unlock(&dt.stat[i].lock); + pthread_rwlock_unlock(&dt.lock); + return -ENOMEM; + } + + strcpy((*buf)[idx++], entry); + + pthread_mutex_unlock(&dt.stat[i].lock); + } + + pthread_rwlock_unlock(&dt.lock); + + assert((size_t) idx == dt.n_flows); + + return idx; +#else + (void) buf; + return 0; +#endif +} + +static int dt_stat_getattr(const char * path, + struct stat * st) +{ +#ifdef IPCP_FLOW_STATS + int fd; + + fd = atoi(path); + + st->st_mode = S_IFREG | 0755; + st->st_nlink = 1; + st->st_uid = getuid(); + st->st_gid = getgid(); + + pthread_mutex_lock(&dt.stat[fd].lock); + + if (dt.stat[fd].stamp != -1) { + st->st_size = STAT_FILE_LEN; + st->st_mtime = dt.stat[fd].stamp; + } else { + st->st_size = 0; + st->st_mtime = 0; + } + + pthread_mutex_unlock(&dt.stat[fd].lock); +#else + (void) path; + (void) st; +#endif + return 0; +} + +static struct rib_ops r_ops = { + .read = dt_stat_read, + .readdir = dt_stat_readdir, + .getattr = dt_stat_getattr +}; + +#ifdef IPCP_FLOW_STATS + +static void set_used(int fd, + bool b) +{ + struct timespec now; + + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + pthread_mutex_lock(&dt.stat[fd].lock); + + dt.stat[fd].stamp = b ? now.tv_sec : 0; + + pthread_mutex_unlock(&dt.stat[fd].lock); + + pthread_rwlock_wrlock(&dt.lock); + + b ? ++dt.n_flows : --dt.n_flows; + + pthread_rwlock_unlock(&dt.lock); +} +#endif + static void handle_event(void * self, int event, const void * o) @@ -79,10 +296,16 @@ static void handle_event(void * self, switch (event) { case NOTIFY_DT_CONN_ADD: +#ifdef IPCP_FLOW_STATS + set_used(c->flow_info.fd, true); +#endif sdu_sched_add(dt.sdu_sched, c->flow_info.fd); log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd); break; case NOTIFY_DT_CONN_DEL: +#ifdef IPCP_FLOW_STATS + set_used(c->flow_info.fd, false); +#endif sdu_sched_del(dt.sdu_sched, c->flow_info.fd); log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd); break; @@ -97,39 +320,112 @@ static void sdu_handler(int fd, { struct dt_pci dt_pci; int ret; - + int ofd; +#ifdef IPCP_FLOW_STATS + size_t len; +#else + (void) fd; +#endif memset(&dt_pci, 0, sizeof(dt_pci)); dt_pci_des(sdb, &dt_pci); - +#ifdef IPCP_FLOW_STATS + len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif if (dt_pci.dst_addr != ipcpi.dt_addr) { if (dt_pci.ttl == 0) { log_dbg("TTL was zero."); ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[fd].r_drp_pkt[qc]; + dt.stat[fd].r_drp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif return; } - fd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); - if (fd < 0) { - log_err("No next hop for %" PRIu64, dt_pci.dst_addr); + /* FIXME: Use qoscube from PCI instead of incoming flow. */ + ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr); + if (ofd < 0) { + log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr); ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[fd].rcv_pkt[qc]; + dt.stat[fd].rcv_bytes[qc] += len; + ++dt.stat[fd].f_nhp_pkt[qc]; + dt.stat[fd].f_nhp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif return; } - ret = ipcp_flow_write(fd, sdb); + ret = ipcp_flow_write(ofd, sdb); if (ret < 0) { - log_err("Failed to write SDU to fd %d.", fd); + log_dbg("Failed to write SDU to fd %d.", ofd); if (ret == -EFLOWDOWN) - notifier_event(NOTIFY_DT_CONN_DOWN, &fd); + notifier_event(NOTIFY_DT_CONN_DOWN, &ofd); ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[fd].rcv_pkt[qc]; + dt.stat[fd].rcv_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); + pthread_mutex_lock(&dt.stat[ofd].lock); + + ++dt.stat[ofd].w_drp_pkt[qc]; + dt.stat[ofd].w_drp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif return; } +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[fd].rcv_pkt[qc]; + dt.stat[fd].rcv_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[fd].lock); + pthread_mutex_lock(&dt.stat[ofd].lock); + + ++dt.stat[ofd].snd_pkt[qc]; + dt.stat[ofd].snd_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[ofd].lock); +#endif } else { dt_pci_shrink(sdb); - if (dt_pci.eid > PROG_RES_FDS) { - if (ipcp_flow_write(dt_pci.eid, sdb)) + if (ipcp_flow_write(dt_pci.eid, sdb)) { ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + + ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; + dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif + + } +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + + ++dt.stat[dt_pci.eid].rcv_pkt[qc]; + dt.stat[dt_pci.eid].rcv_bytes[qc] += len; + ++dt.stat[dt_pci.eid].lcl_r_pkt[qc]; + dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif return; } @@ -137,9 +433,26 @@ static void sdu_handler(int fd, log_err("No registered component on eid %d.", dt_pci.eid); ipcp_sdb_release(sdb); +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + + ++dt.stat[dt_pci.eid].w_drp_pkt[qc]; + dt.stat[dt_pci.eid].w_drp_bytes[qc] += len; + + pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif return; } +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[dt_pci.eid].lock); + + ++dt.stat[fd].rcv_pkt[qc]; + dt.stat[fd].rcv_bytes[qc] += len; + ++dt.stat[fd].lcl_r_pkt[qc]; + dt.stat[fd].lcl_r_bytes[qc] += len; + pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock); +#endif dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb); } } @@ -229,9 +542,30 @@ int dt_init(enum pol_routing pr, dt.res_fds = bmp_create(PROG_RES_FDS, 0); if (dt.res_fds == NULL) goto fail_res_fds; +#ifdef IPCP_FLOW_STAT + memset(dt.stat, 0, sizeof(dt.stat)); + + for (i = 0; i < PROG_MAX_FLOWS, ++i) + if (pthread_mutex_init(&dt.stat[fd].lock, NULL)) { + for (j = 0; j < i; ++j) + pthread_mutex_destroy(&dt.stat[j].lock); + goto fail_stat_lock; + } + + dt.n_flows = 0; +#endif + if (rib_reg(DT, &r_ops)) + goto fail_rib_reg; return 0; + fail_rib_reg: +#ifdef IPCP_FLOW_STAT + for (i = 0; i < PROG_MAX_FLOWS; ++i) + pthread_mutex_destroy(&dt.stat[i].lock); + fail_stat_lock: +#endif + bmp_destroy(dt.res_fds); fail_res_fds: pthread_rwlock_destroy(&dt.lock); fail_rwlock_init: @@ -256,6 +590,11 @@ void dt_fini(void) { int i; + rib_unreg(DT); +#ifdef IPCP_FLOW_STATS + for (i = 0; i < PROG_MAX_FLOWS; ++i) + pthread_mutex_destroy(&dt.stat[i].lock); +#endif bmp_destroy(dt.res_fds); pthread_rwlock_destroy(&dt.lock); @@ -322,7 +661,9 @@ int dt_reg_comp(void * comp, dt.comps[res_fd].comp = comp; pthread_rwlock_unlock(&dt.lock); - +#ifdef IPCP_FLOW_STATS + set_used(res_fd, true); +#endif return res_fd; } @@ -334,14 +675,18 @@ int dt_write_sdu(uint64_t dst_addr, int fd; struct dt_pci dt_pci; int ret; - +#ifdef IPCP_FLOW_STATS + size_t len; +#endif assert(sdb); assert(dst_addr != ipcpi.dt_addr); - +#ifdef IPCP_FLOW_STATS + len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb); +#endif fd = pff_nhop(dt.pff[qc], dst_addr); if (fd < 0) { log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr); - return -1; + goto fail_write; } dt_pci.dst_addr = dst_addr; @@ -349,17 +694,39 @@ int dt_write_sdu(uint64_t dst_addr, dt_pci.eid = np1_fd; if (dt_pci_ser(sdb, &dt_pci)) { - log_err("Failed to serialize PDU."); - return -1; + log_dbg("Failed to serialize PDU."); + goto fail_write; } ret = ipcp_flow_write(fd, sdb); if (ret < 0) { - log_err("Failed to write SDU to fd %d.", fd); + log_dbg("Failed to write SDU to fd %d.", fd); if (ret == -EFLOWDOWN) notifier_event(NOTIFY_DT_CONN_DOWN, &fd); - return -1; + goto fail_write; } +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[dt_pci.eid].lcl_w_pkt[qc]; + dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len; + ++dt.stat[fd].snd_pkt[qc]; + dt.stat[fd].snd_bytes[qc] = len; + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif return 0; + + fail_write: +#ifdef IPCP_FLOW_STATS + pthread_mutex_lock(&dt.stat[fd].lock); + + ++dt.stat[dt_pci.eid].lcl_w_pkt[qc]; + dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len; + ++dt.stat[fd].w_drp_pkt[qc]; + dt.stat[fd].w_drp_bytes[qc] = len; + + pthread_mutex_unlock(&dt.stat[fd].lock); +#endif + return -1; } |