summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/ipcpd/config.h.in2
-rw-r--r--src/ipcpd/normal/CMakeLists.txt10
-rw-r--r--src/ipcpd/normal/dt.c407
3 files changed, 399 insertions, 20 deletions
diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in
index 3fa74457..a4893f50 100644
--- a/src/ipcpd/config.h.in
+++ b/src/ipcpd/config.h.in
@@ -45,6 +45,8 @@
#define IPCP_SCHED_THR_MUL @IPCP_SCHED_THR_MUL@
#define PFT_SIZE @PFT_SIZE@
+#cmakedefine IPCP_FLOW_STATS
+
/* udp */
#cmakedefine HAVE_DDNS
#define NSUPDATE_EXEC "@NSUPDATE_EXECUTABLE@"
diff --git a/src/ipcpd/normal/CMakeLists.txt b/src/ipcpd/normal/CMakeLists.txt
index 0b444852..e7e230d2 100644
--- a/src/ipcpd/normal/CMakeLists.txt
+++ b/src/ipcpd/normal/CMakeLists.txt
@@ -21,6 +21,16 @@ protobuf_generate_c(KAD_PROTO_SRCS KAD_PROTO_HDRS kademlia.proto)
math(EXPR PFT_EXPR "1 << 12")
set(PFT_SIZE ${PFT_EXPR} CACHE STRING
"Size of the PDU forwarding table")
+if (HAVE_FUSE)
+ set(IPCP_FLOW_STATS FALSE CACHE BOOL
+ "Enable flow statistics tracking in IPCP")
+ if (IPCP_FLOW_STATS)
+ message(STATUS "IPCP flow statistics enabled")
+ else ()
+ message(STATUS "IPCP flow statistics disabled")
+ endif ()
+endif ()
+
set(SOURCE_FILES
# Add source files here
diff --git a/src/ipcpd/normal/dt.c b/src/ipcpd/normal/dt.c
index 0df68dd4..2cfd7417 100644
--- a/src/ipcpd/normal/dt.c
+++ b/src/ipcpd/normal/dt.c
@@ -24,13 +24,17 @@
#include "config.h"
-#define OUROBOROS_PREFIX "dt"
+#define DT "dt"
+#define OUROBOROS_PREFIX DT
+/* FIXME: fix #defines and remove endian.h include. */
+#include <ouroboros/endian.h>
#include <ouroboros/bitmap.h>
#include <ouroboros/errno.h>
#include <ouroboros/logs.h>
#include <ouroboros/dev.h>
#include <ouroboros/notifier.h>
+#include <ouroboros/rib.h>
#include "connmgr.h"
#include "ipcp.h"
@@ -49,6 +53,8 @@
#include <inttypes.h>
#include <assert.h>
+#define STAT_FILE_LEN 1590
+
struct comp_info {
void (* post_sdu)(void * comp, struct shm_du_buff * sdb);
void * comp;
@@ -59,7 +65,28 @@ struct {
struct pff * pff[QOS_CUBE_MAX];
struct routing_i * routing[QOS_CUBE_MAX];
-
+#ifdef IPCP_FLOW_STATS
+ struct {
+ time_t stamp;
+ size_t snd_pkt[QOS_CUBE_MAX];
+ size_t rcv_pkt[QOS_CUBE_MAX];
+ size_t snd_bytes[QOS_CUBE_MAX];
+ size_t rcv_bytes[QOS_CUBE_MAX];
+ size_t lcl_r_pkt[QOS_CUBE_MAX];
+ size_t lcl_r_bytes[QOS_CUBE_MAX];
+ size_t lcl_w_pkt[QOS_CUBE_MAX];
+ size_t lcl_w_bytes[QOS_CUBE_MAX];
+ size_t r_drp_pkt[QOS_CUBE_MAX];
+ size_t r_drp_bytes[QOS_CUBE_MAX];
+ size_t w_drp_pkt[QOS_CUBE_MAX];
+ size_t w_drp_bytes[QOS_CUBE_MAX];
+ size_t f_nhp_pkt[QOS_CUBE_MAX];
+ size_t f_nhp_bytes[QOS_CUBE_MAX];
+ pthread_mutex_t lock;
+ } stat[PROG_MAX_FLOWS];
+
+ size_t n_flows;
+#endif
struct bmp * res_fds;
struct comp_info comps[PROG_RES_FDS];
pthread_rwlock_t lock;
@@ -67,6 +94,196 @@ struct {
pthread_t listener;
} dt;
+static int dt_stat_read(const char * path,
+ char * buf,
+ size_t len)
+{
+#ifdef IPCP_FLOW_STATS
+ int fd;
+ int i;
+ char str[587];
+
+ /* NOTE: we may need stronger checks. */
+ fd = atoi(path);
+
+ if (len < STAT_FILE_LEN)
+ return 0;
+
+ buf[0] = '\0';
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ if (dt.stat[fd].stamp == 0) {
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ return 0;
+ }
+
+ for (i = 0; i < QOS_CUBE_MAX; ++i) {
+ sprintf(str,
+ "Qos cube %d:\n"
+ " sent (packets): %10zu\n"
+ " rcvd (packets): %10zu\n"
+ " sent (bytes): %10zu\n"
+ " rcvd (bytes): %10zu\n"
+ " local sent (packets): %10zu\n"
+ " local sent (bytes): %10zu\n"
+ " local rcvd (packets): %10zu\n"
+ " local rcvd (bytes): %10zu\n"
+ " dropped ttl (packets): %10zu\n"
+ " dropped ttl (bytes): %10zu\n"
+ " failed writes (packets): %10zu\n"
+ " failed writes (bytes): %10zu\n"
+ " failed nhop (packets): %10zu\n"
+ " failed nhop (bytes): %10zu\n",
+ i,
+ dt.stat[fd].snd_pkt[i],
+ dt.stat[fd].rcv_pkt[i],
+ dt.stat[fd].snd_bytes[i],
+ dt.stat[fd].rcv_bytes[i],
+ dt.stat[fd].lcl_w_pkt[i],
+ dt.stat[fd].lcl_w_bytes[i],
+ dt.stat[fd].lcl_r_pkt[i],
+ dt.stat[fd].lcl_r_bytes[i],
+ dt.stat[fd].r_drp_pkt[i],
+ dt.stat[fd].r_drp_bytes[i],
+ dt.stat[fd].w_drp_pkt[i],
+ dt.stat[fd].w_drp_bytes[i],
+ dt.stat[fd].f_nhp_pkt[i],
+ dt.stat[fd].f_nhp_bytes[i]
+ );
+ strcat(buf, str);
+ }
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+
+ return STAT_FILE_LEN;
+#else
+ (void) path;
+ (void) buf;
+ (void) len;
+ return 0;
+#endif
+}
+
+static int dt_stat_readdir(char *** buf)
+{
+#ifdef IPCP_FLOW_STATS
+ char entry[RIB_PATH_LEN + 1];
+ size_t i;
+ int idx = 0;
+
+ pthread_rwlock_rdlock(&dt.lock);
+
+ if (dt.n_flows < 1) {
+ pthread_rwlock_unlock(&dt.lock);
+ return 0;
+ }
+
+ *buf = malloc(sizeof(**buf) * dt.n_flows);
+ if (*buf == NULL) {
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
+ }
+
+ for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+ pthread_mutex_lock(&dt.stat[i].lock);
+
+ if (dt.stat[i].stamp == 0) {
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ /* Optimization: skip unused res_fds. */
+ if (i < PROG_RES_FDS)
+ i = PROG_RES_FDS;
+ continue;
+ }
+
+ sprintf(entry, "%zu", i);
+
+ (*buf)[idx] = malloc(strlen(entry) + 1);
+ if ((*buf)[idx] == NULL) {
+ while (idx-- > 0)
+ free((*buf)[idx]);
+ free(buf);
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ pthread_rwlock_unlock(&dt.lock);
+ return -ENOMEM;
+ }
+
+ strcpy((*buf)[idx++], entry);
+
+ pthread_mutex_unlock(&dt.stat[i].lock);
+ }
+
+ pthread_rwlock_unlock(&dt.lock);
+
+ assert((size_t) idx == dt.n_flows);
+
+ return idx;
+#else
+ (void) buf;
+ return 0;
+#endif
+}
+
+static int dt_stat_getattr(const char * path,
+ struct stat * st)
+{
+#ifdef IPCP_FLOW_STATS
+ int fd;
+
+ fd = atoi(path);
+
+ st->st_mode = S_IFREG | 0755;
+ st->st_nlink = 1;
+ st->st_uid = getuid();
+ st->st_gid = getgid();
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ if (dt.stat[fd].stamp != -1) {
+ st->st_size = STAT_FILE_LEN;
+ st->st_mtime = dt.stat[fd].stamp;
+ } else {
+ st->st_size = 0;
+ st->st_mtime = 0;
+ }
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#else
+ (void) path;
+ (void) st;
+#endif
+ return 0;
+}
+
+static struct rib_ops r_ops = {
+ .read = dt_stat_read,
+ .readdir = dt_stat_readdir,
+ .getattr = dt_stat_getattr
+};
+
+#ifdef IPCP_FLOW_STATS
+
+static void set_used(int fd,
+ bool b)
+{
+ struct timespec now;
+
+ clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ dt.stat[fd].stamp = b ? now.tv_sec : 0;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+
+ pthread_rwlock_wrlock(&dt.lock);
+
+ b ? ++dt.n_flows : --dt.n_flows;
+
+ pthread_rwlock_unlock(&dt.lock);
+}
+#endif
+
static void handle_event(void * self,
int event,
const void * o)
@@ -79,10 +296,16 @@ static void handle_event(void * self,
switch (event) {
case NOTIFY_DT_CONN_ADD:
+#ifdef IPCP_FLOW_STATS
+ set_used(c->flow_info.fd, true);
+#endif
sdu_sched_add(dt.sdu_sched, c->flow_info.fd);
log_dbg("Added fd %d to SDU scheduler.", c->flow_info.fd);
break;
case NOTIFY_DT_CONN_DEL:
+#ifdef IPCP_FLOW_STATS
+ set_used(c->flow_info.fd, false);
+#endif
sdu_sched_del(dt.sdu_sched, c->flow_info.fd);
log_dbg("Removed fd %d from SDU scheduler.", c->flow_info.fd);
break;
@@ -97,39 +320,112 @@ static void sdu_handler(int fd,
{
struct dt_pci dt_pci;
int ret;
-
+ int ofd;
+#ifdef IPCP_FLOW_STATS
+ size_t len;
+#else
+ (void) fd;
+#endif
memset(&dt_pci, 0, sizeof(dt_pci));
dt_pci_des(sdb, &dt_pci);
-
+#ifdef IPCP_FLOW_STATS
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#endif
if (dt_pci.dst_addr != ipcpi.dt_addr) {
if (dt_pci.ttl == 0) {
log_dbg("TTL was zero.");
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].r_drp_pkt[qc];
+ dt.stat[fd].r_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
return;
}
- fd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
- if (fd < 0) {
- log_err("No next hop for %" PRIu64, dt_pci.dst_addr);
+ /* FIXME: Use qoscube from PCI instead of incoming flow. */
+ ofd = pff_nhop(dt.pff[qc], dt_pci.dst_addr);
+ if (ofd < 0) {
+ log_dbg("No next hop for %" PRIu64, dt_pci.dst_addr);
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+ ++dt.stat[fd].f_nhp_pkt[qc];
+ dt.stat[fd].f_nhp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
return;
}
- ret = ipcp_flow_write(fd, sdb);
+ ret = ipcp_flow_write(ofd, sdb);
if (ret < 0) {
- log_err("Failed to write SDU to fd %d.", fd);
+ log_dbg("Failed to write SDU to fd %d.", ofd);
if (ret == -EFLOWDOWN)
- notifier_event(NOTIFY_DT_CONN_DOWN, &fd);
+ notifier_event(NOTIFY_DT_CONN_DOWN, &ofd);
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[ofd].lock);
+
+ ++dt.stat[ofd].w_drp_pkt[qc];
+ dt.stat[ofd].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[ofd].lock);
+#endif
return;
}
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+ pthread_mutex_lock(&dt.stat[ofd].lock);
+
+ ++dt.stat[ofd].snd_pkt[qc];
+ dt.stat[ofd].snd_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[ofd].lock);
+#endif
} else {
dt_pci_shrink(sdb);
-
if (dt_pci.eid > PROG_RES_FDS) {
- if (ipcp_flow_write(dt_pci.eid, sdb))
+ if (ipcp_flow_write(dt_pci.eid, sdb)) {
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
+ dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
+
+ }
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].rcv_pkt[qc];
+ dt.stat[dt_pci.eid].rcv_bytes[qc] += len;
+ ++dt.stat[dt_pci.eid].lcl_r_pkt[qc];
+ dt.stat[dt_pci.eid].lcl_r_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
return;
}
@@ -137,9 +433,26 @@ static void sdu_handler(int fd,
log_err("No registered component on eid %d.",
dt_pci.eid);
ipcp_sdb_release(sdb);
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[dt_pci.eid].w_drp_pkt[qc];
+ dt.stat[dt_pci.eid].w_drp_bytes[qc] += len;
+
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
return;
}
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[dt_pci.eid].lock);
+
+ ++dt.stat[fd].rcv_pkt[qc];
+ dt.stat[fd].rcv_bytes[qc] += len;
+ ++dt.stat[fd].lcl_r_pkt[qc];
+ dt.stat[fd].lcl_r_bytes[qc] += len;
+ pthread_mutex_unlock(&dt.stat[dt_pci.eid].lock);
+#endif
dt.comps[dt_pci.eid].post_sdu(dt.comps[dt_pci.eid].comp, sdb);
}
}
@@ -229,9 +542,30 @@ int dt_init(enum pol_routing pr,
dt.res_fds = bmp_create(PROG_RES_FDS, 0);
if (dt.res_fds == NULL)
goto fail_res_fds;
+#ifdef IPCP_FLOW_STAT
+ memset(dt.stat, 0, sizeof(dt.stat));
+
+ for (i = 0; i < PROG_MAX_FLOWS, ++i)
+ if (pthread_mutex_init(&dt.stat[fd].lock, NULL)) {
+ for (j = 0; j < i; ++j)
+ pthread_mutex_destroy(&dt.stat[j].lock);
+ goto fail_stat_lock;
+ }
+
+ dt.n_flows = 0;
+#endif
+ if (rib_reg(DT, &r_ops))
+ goto fail_rib_reg;
return 0;
+ fail_rib_reg:
+#ifdef IPCP_FLOW_STAT
+ for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ pthread_mutex_destroy(&dt.stat[i].lock);
+ fail_stat_lock:
+#endif
+ bmp_destroy(dt.res_fds);
fail_res_fds:
pthread_rwlock_destroy(&dt.lock);
fail_rwlock_init:
@@ -256,6 +590,11 @@ void dt_fini(void)
{
int i;
+ rib_unreg(DT);
+#ifdef IPCP_FLOW_STATS
+ for (i = 0; i < PROG_MAX_FLOWS; ++i)
+ pthread_mutex_destroy(&dt.stat[i].lock);
+#endif
bmp_destroy(dt.res_fds);
pthread_rwlock_destroy(&dt.lock);
@@ -322,7 +661,9 @@ int dt_reg_comp(void * comp,
dt.comps[res_fd].comp = comp;
pthread_rwlock_unlock(&dt.lock);
-
+#ifdef IPCP_FLOW_STATS
+ set_used(res_fd, true);
+#endif
return res_fd;
}
@@ -334,14 +675,18 @@ int dt_write_sdu(uint64_t dst_addr,
int fd;
struct dt_pci dt_pci;
int ret;
-
+#ifdef IPCP_FLOW_STATS
+ size_t len;
+#endif
assert(sdb);
assert(dst_addr != ipcpi.dt_addr);
-
+#ifdef IPCP_FLOW_STATS
+ len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+#endif
fd = pff_nhop(dt.pff[qc], dst_addr);
if (fd < 0) {
log_dbg("Could not get nhop for addr %" PRIu64 ".", dst_addr);
- return -1;
+ goto fail_write;
}
dt_pci.dst_addr = dst_addr;
@@ -349,17 +694,39 @@ int dt_write_sdu(uint64_t dst_addr,
dt_pci.eid = np1_fd;
if (dt_pci_ser(sdb, &dt_pci)) {
- log_err("Failed to serialize PDU.");
- return -1;
+ log_dbg("Failed to serialize PDU.");
+ goto fail_write;
}
ret = ipcp_flow_write(fd, sdb);
if (ret < 0) {
- log_err("Failed to write SDU to fd %d.", fd);
+ log_dbg("Failed to write SDU to fd %d.", fd);
if (ret == -EFLOWDOWN)
notifier_event(NOTIFY_DT_CONN_DOWN, &fd);
- return -1;
+ goto fail_write;
}
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[dt_pci.eid].lcl_w_pkt[qc];
+ dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len;
+ ++dt.stat[fd].snd_pkt[qc];
+ dt.stat[fd].snd_bytes[qc] = len;
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
return 0;
+
+ fail_write:
+#ifdef IPCP_FLOW_STATS
+ pthread_mutex_lock(&dt.stat[fd].lock);
+
+ ++dt.stat[dt_pci.eid].lcl_w_pkt[qc];
+ dt.stat[dt_pci.eid].lcl_w_bytes[qc] += len;
+ ++dt.stat[fd].w_drp_pkt[qc];
+ dt.stat[fd].w_drp_bytes[qc] = len;
+
+ pthread_mutex_unlock(&dt.stat[fd].lock);
+#endif
+ return -1;
}