summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri.staessens@ugent.be>2018-02-20 08:15:13 +0100
committerSander Vrijders <sander.vrijders@ugent.be>2018-02-20 08:49:56 +0100
commitcc4b333f5a6964120f2e40c33f67d0be7dd409fc (patch)
treed91399337554d3877c1f912f44f2b0c5a45c671b
parente72fcd924b25b2b3b8a45c85d9c3d09388885249 (diff)
downloadouroboros-cc4b333f5a6964120f2e40c33f67d0be7dd409fc.tar.gz
ouroboros-cc4b333f5a6964120f2e40c33f67d0be7dd409fc.zip
ipcpd: Export flow statistics to RIB
This adds flow statistics for the data transfer (DT) component to the RIB. The DT component will keep track of the traffic on each flow. This feature can be enabled or disabled by setting the IPCP_FLOW_STATS variable in the build system. Signed-off-by: Dimitri Staessens <dimitri.staessens@ugent.be> Signed-off-by: Sander Vrijders <sander.vrijders@ugent.be>
-rw-r--r--src/ipcpd/config.h.in2
-rw-r--r--src/ipcpd/normal/CMakeLists.txt10
-rw-r--r--src/ipcpd/normal/dt.c407
3 files changed, 399 insertions, 20 deletions
diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in
index 3fa74457..a4893f50 100644
--- a/src/ipcpd/config.h.in
+++ b/src/ipcpd/config.h.in
@@ -45,6 +45,8 @@
#define IPCP_SCHED_THR_MUL @IPCP_SCHED_THR_MUL@
#define PFT_SIZE @PFT_SIZE@
+#cmakedefine IPCP_FLOW_STATS
+
/* udp */
#cmakedefine HAVE_DDNS
#define NSUPDATE_EXEC "@NSUPDATE_EXECUTABLE@"
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 0b444852..e7e230d2 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -21,6 +21,16 @@ protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto)
math(EXPR PFT_EXPR "1 << 12")
set(PFT_SIZE ${PFT_EXPR} CACHE STRING
"Size of the PDU forwarding table")
+if (HAVE_FUSE)
+ set(IPCP_FLOW_STATS FALSE CACHE BOOL
+ "Enable flow statistics tracking in IPCP")
+ if (IPCP_FLOW_STATS)
+ message(STATUS "IPCP flow statistics enabled")
+ else ()
+ message(STATUS "IPCP flow statistics disabled")
+ endif ()
+endif ()
+
set(SOURCE_FILES
# Add source files here
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 0df68dd4..2cfd7417 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -24,13 +24,17 @@
#include "config.h"
-#define OUROBOROS_PREFIX "dt"
+#define DT "dt"
+#define OUROBOROS_PREFIX DT
+/* FIXME: fix #defines and remove endian.h include. */
+#include <ouroboros/endian.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/notifier.h>
+#include <ouroboros/rib.h>
#include "connmgr.h"
#include "ipcp.h"
@@ -49,6 +53,8 @@
#include <inttypes.h>
#include <assert.h>
+#define STAT_FILE_LEN 1590
+
struct comp_info {
void (* post_sdu)(void * comp, struct shm_du_buff * sdb);
void * comp;
@@ -59,7 +65,28 @@ struct {
struct pff * pff[QOS_CUBE_MAX];
struct routing_i * routing[QOS_CUBE_MAX];
-
+#ifdef IPCP_FLOW_STATS
+ struct {
+ time_t stamp;
+ size_t snd_pkt[QOS_CUBE_MAX];
+ size_t rcv_pkt[QOS_CUBE_MAX];
+ size_t snd_bytes[QOS_CUBE_MAX];
+ size_t rcv_bytes[QOS_CUBE_MAX];
+ size_t lcl_r_pkt[QOS_CUBE_MAX];
+ size_t lcl_r_bytes[QOS_CUBE_MAX];
+ size_t lcl_w_pkt[QOS_CUBE_MAX];
+ size_t lcl_w_bytes[QOS_CUBE_MAX];
+ size_t r_drp_pkt[QOS_CUBE_MAX];
+ size_t r_drp_bytes[QOS_CUBE_MAX];
+ size_t w_drp_pkt[QOS_CUBE_MAX];
+ size_t w_drp_bytes[QOS_CUBE_MAX];
+ size_t f_nhp_pkt[QOS_CUBE_MAX];
+ size_t f_nhp_bytes[QOS_CUBE_MAX];
+ pthread_mutex_t lock;
+ } stat[PROG_MAX_FLOWS];
+
+ size_t n_flows;
+#endif
struct bmp * res_fds;
struct comp_info comps[PROG_RES_FDS];
pthread_rwlock_t lock;
@@ -67,6 +94,196 @@ struct {
pthread_t listener;
} dt;
+static int dt_stat_read(const char * path,
+ char * buf,
+ size_t len)
+{
+#ifdef IPCP_FLOW_STATS
+ int fd;
+ int i;
+ char str[587];
+
+ /* NOTE: we may need stronger checks. */
+ fd = atoi(path);
+
+ if (len < STAT_FILE_LEN)
+ return 0;
+
+ buf[0] = '\0';
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ if (dt.stat[fd].stamp == 0) {
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ return 0;
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ sprintf(str,
+ "Qos cube %d:\n"
+ " sent (packets): %10zu\n"
+ " rcvd (packets): %10zu\n"
+ " sent (bytes): %10zu\n"
+ " rcvd (bytes): %10zu\n"
+ " local sent (packets): %10zu\n"
+ " local sent (bytes): %10zu\n"
+ " local rcvd (packets): %10zu\n"
+ " local rcvd (bytes): %10zu\n"
+ " dropped ttl (packets): %10zu\n"
+ " dropped ttl (bytes): %10zu\n"
+ " failed writes (packets): %10zu\n"
+ " failed writes (bytes): %10zu\n"
+ " failed nhop (packets): %10zu\n"
+ " failed nhop (bytes): %10zu\n",
+ i,
+ dt.stat[fd].snd_pkt[i],
+ dt.stat[fd].rcv_pkt[i],
+ dt.stat[fd].snd_bytes[i],
+ dt.stat[fd].rcv_bytes[i],
+ dt.stat[fd].lcl_w_pkt[i],
+ dt.stat[fd].lcl_w_bytes[i],
+ dt.stat[fd].lcl_r_pkt[i],
+ dt.stat[fd].lcl_r_bytes[i],
+ dt.stat[fd].r_drp_pkt[i],
+ dt.stat[fd].r_drp_bytes[i],
+ dt.stat[fd].w_drp_pkt[i],
+ dt.stat[fd].w_drp_bytes[i],
+ dt.stat[fd].f_nhp_pkt[i],
+ dt.stat[fd].f_nhp_bytes[i]
+ );
+ strcat(buf, str);
+ }
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+
+ return STAT_FILE_LEN;
+#else
+ (void) path;
+ (void) buf;
+ (void) len;
+ return 0;
+#endif
+}
+
+static int dt_stat_readdir(char *** buf)
+{
+#ifdef IPCP_FLOW_STATS
+ char entry[RIB_PATH_LEN + 1];
+ size_t i;
+ int idx = 0;
+
+ pthread_rwlock_rdlock(&dt.lock);
+
+ if (dt.n_flows < 1) {
+ pthread_rwlock_unlock(&dt.lock);
+ return 0;
+ }
+
+ *buf = malloc(sizeof(**buf) * dt.n_flows);
+ if (*buf == NULL) {
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
+ }
+
+ for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+ pthread_mutex_lock(&dt.stat[i].lock);
+
+ if (dt.stat[i].stamp == 0) {
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ /* Optimization: skip unused res_fds. */
+ if (i < PROG_RES_FDS)
+ i = PROG_RES_FDS;
+ continue;
+ }
+
+ sprintf(entry, "%zu", i);
+
+ (*buf)[idx] = malloc(strlen(entry) + 1);
+ if ((*buf)[idx] == NULL) {
+ while (idx-- > 0)
+ free((*buf)[idx]);
+ free(buf);
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
+ }
+
+ strcpy((*buf)[idx++], entry);
+
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ }
+
+ pthread_rwlock_unlock(&dt.lock);
+
+ assert((size_t) idx == dt.n_flows);
+
+ return idx;
+#else
+ (void) buf;
+ return 0;
+#endif
+}
+
+static int dt_stat_getattr(const char * path,
+ struct stat * st)
+{
+#ifdef IPCP_FLOW_STATS
+ int fd;
+
+ fd = atoi(path);
+
+ st->st_mode = S_IFREG | 0755;
+ st->st_nlink = 1;
+ st->st_uid = getuid();
+ st->st_gid = getgid();
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ if (dt.stat[fd].stamp != -1) {
+ st->st_size = STAT_FILE_LEN;
+ st->st_mtime = dt.stat[fd].stamp;
+ } else {
+ st->st_size = 0;
+ st->st_mtime = 0;
+ }
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#else
+ (void) path;
+ (void) st;
+#endif
+ return 0;
+}
+
+static struct rib_ops r_ops = {
+ .read = dt_stat_read,
+ .readdir = dt_stat_readdir,
+ .getattr = dt_stat_getattr
+};
+
+#ifdef IPCP_FLOW_STATS
+
+static void set_used(int fd,
+ bool b)
+{
+ struct timespec now;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ dt.stat[fd].stamp = b ? now.tv_sec : 0;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+
+ pthread_rwlock_wrlock(&dt.lock);
+
+ b ? ++dt.n_flows : --dt.n_flows;
+
+ pthread_rwlock_unlock(&dt.lock);
+}
+#endif
+
static void handle_event(void * self,
int event,
const void * o)
@@ -79,10 +296,16 @@ static void handle_event(void * self,
switch (event) {
case NOTIFY_DT_CONN_ADD:
+#ifdef IPCP_FLOW_STATS
+ set_used(c->flow_info.fd, true);
+#endif
sdu_sched_add(dt.sdu_sched, c->flow_info.fd);
log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd);
break;
case NOTIFY_DT_CONN_DEL:
+#ifdef IPCP_FLOW_STATS
+ set_used(c->flow_info.fd, false);
+#endif
sdu_sched_del(dt.sdu_sched, c->flow_info.fd);
log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd);
break;
@@ -97,39 +320,112 @@ static void sdu_handler(int fd,
{
struct dt_pci dt_pci;
int ret;
-
+ int ofd;
+#ifdef IPCP_FLOW_STATS
+ size_t len;
+#else
+ (void) fd;
+#endif
memset(&dt_pci, 0, sizeof(dt_pci));
dt_pci_des(sdb, &dt_pci);
-
+#ifdef IPCP_FLOW_STATS
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#endif
if (dt_pci.dst_addr != ipcpi.dt_addr) {
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].r_drp_pkt[qc];
+ dt.stat[fd].r_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
return;
}
- fd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
- if (fd < 0) {
- log_err("No next hop for %" PRIu64, dt_pci.dst_addr);
+ /* FIXME: Use qoscube from PCI instead of incoming flow. */
+ ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
+ if (ofd < 0) {
+ log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+ ++dt.stat[fd].f_nhp_pkt[qc];
+ dt.stat[fd].f_nhp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
return;
}
- ret = ipcp_flow_write(fd, sdb);
+ ret = ipcp_flow_write(ofd, sdb);
if (ret < 0) {
- log_err("Failed to write SDU to fd %d.", fd);
+ log_dbg("Failed to write SDU to fd %d.", ofd);
if (ret == -EFLOWDOWN)
- notifier_event(NOTIFY_DT_CONN_DOWN, &fd);
+ notifier_event(NOTIFY_DT_CONN_DOWN, &ofd);
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[ofd].lock);
+
+ ++dt.stat[ofd].w_drp_pkt[qc];
+ dt.stat[ofd].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[ofd].lock);
+#endif
return;
}
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[ofd].lock);
+
+ ++dt.stat[ofd].snd_pkt[qc];
+ dt.stat[ofd].snd_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[ofd].lock);
+#endif
} else {
dt_pci_shrink(sdb);
-
if (dt_pci.eid > PROG_RES_FDS) {
- if (ipcp_flow_write(dt_pci.eid, sdb))
+ if (ipcp_flow_write(dt_pci.eid, sdb)) {
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
+ dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
+
+ }
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].rcv_pkt[qc];
+ dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
+ ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
+ dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
return;
}
@@ -137,9 +433,26 @@ static void sdu_handler(int fd,
log_err("No registered component on eid %d.",
dt_pci.eid);
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
+ dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
return;
}
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+ ++dt.stat[fd].lcl_r_pkt[qc];
+ dt.stat[fd].lcl_r_bytes[qc] += len;
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb);
}
}
@@ -229,9 +542,30 @@ int dt_init(enum pol_routing pr,
dt.res_fds = bmp_create(PROG_RES_FDS, 0);
if (dt.res_fds == NULL)
goto fail_res_fds;
+#ifdef IPCP_FLOW_STAT
+ memset(dt.stat, 0, sizeof(dt.stat));
+
+ for (i = 0; i < PROG_MAX_FLOWS, ++i)
+ if (pthread_mutex_init(&dt.stat[fd].lock, NULL)) {
+ for (j = 0; j < i; ++j)
+ pthread_mutex_destroy(&dt.stat[j].lock);
+ goto fail_stat_lock;
+ }
+
+ dt.n_flows = 0;
+#endif
+ if (rib_reg(DT, &r_ops))
+ goto fail_rib_reg;
return 0;
+ fail_rib_reg:
+#ifdef IPCP_FLOW_STAT
+ for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ pthread_mutex_destroy(&dt.stat[i].lock);
+ fail_stat_lock:
+#endif
+ bmp_destroy(dt.res_fds);
fail_res_fds:
pthread_rwlock_destroy(&dt.lock);
fail_rwlock_init:
@@ -256,6 +590,11 @@ void dt_fini(void)
{
int i;
+ rib_unreg(DT);
+#ifdef IPCP_FLOW_STATS
+ for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ pthread_mutex_destroy(&dt.stat[i].lock);
+#endif
bmp_destroy(dt.res_fds);
pthread_rwlock_destroy(&dt.lock);
@@ -322,7 +661,9 @@ int dt_reg_comp(void * comp,
dt.comps[res_fd].comp = comp;
pthread_rwlock_unlock(&dt.lock);
-
+#ifdef IPCP_FLOW_STATS
+ set_used(res_fd, true);
+#endif
return res_fd;
}
@@ -334,14 +675,18 @@ int dt_write_sdu(uint64_t dst_addr,
int fd;
struct dt_pci dt_pci;
int ret;
-
+#ifdef IPCP_FLOW_STATS
+ size_t len;
+#endif
assert(sdb);
assert(dst_addr != ipcpi.dt_addr);
-
+#ifdef IPCP_FLOW_STATS
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#endif
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
- return -1;
+ goto fail_write;
}
dt_pci.dst_addr = dst_addr;
@@ -349,17 +694,39 @@ int dt_write_sdu(uint64_t dst_addr,
dt_pci.eid = np1_fd;
if (dt_pci_ser(sdb, &dt_pci)) {
- log_err("Failed to serialize PDU.");
- return -1;
+ log_dbg("Failed to serialize PDU.");
+ goto fail_write;
}
ret = ipcp_flow_write(fd, sdb);
if (ret < 0) {
- log_err("Failed to write SDU to fd %d.", fd);
+ log_dbg("Failed to write SDU to fd %d.", fd);
if (ret == -EFLOWDOWN)
notifier_event(NOTIFY_DT_CONN_DOWN, &fd);
- return -1;
+ goto fail_write;
}
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[dt_pci.eid].lcl_w_pkt[qc];
+ dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len;
+ ++dt.stat[fd].snd_pkt[qc];
+ dt.stat[fd].snd_bytes[qc] = len;
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
return 0;
+
+ fail_write:
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[dt_pci.eid].lcl_w_pkt[qc];
+ dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len;
+ ++dt.stat[fd].w_drp_pkt[qc];
+ dt.stat[fd].w_drp_bytes[qc] = len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
+ return -1;
}