From 962b37bb28724bdf28abbe5d48350adba6000ed4 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sat, 5 Dec 2020 15:05:13 +0100 Subject: ipcpd: Add RIB statistics for flow allocator The RIB will now show some stats for the flow allocator, including congestion avoidance statistics. This is needed before decoupling the data transfer component and the flow allocator as some current stats show in DT will move to FA. Signed-off-by: Dimitri Staessens Signed-off-by: Sander Vrijders --- src/ipcpd/unicast/ca.c | 10 ++ src/ipcpd/unicast/ca.h | 4 + src/ipcpd/unicast/dt.c | 4 +- src/ipcpd/unicast/fa.c | 195 ++++++++++++++++++++++++++++++++++++-- src/ipcpd/unicast/pol-ca-ops.h | 6 ++ src/ipcpd/unicast/pol/ca-mb-ecn.c | 41 +++++++- src/ipcpd/unicast/pol/ca-mb-ecn.h | 4 + src/ipcpd/unicast/pol/ca-nop.c | 3 +- 8 files changed, 255 insertions(+), 12 deletions(-) (limited to 'src') diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c index f93d0504..73de14a5 100644 --- a/src/ipcpd/unicast/ca.c +++ b/src/ipcpd/unicast/ca.c @@ -97,3 +97,13 @@ uint8_t ca_calc_ecn(int fd, { return ca.ops->calc_ecn(fd, len); } + +ssize_t ca_print_stats(void * ctx, + char * buf, + size_t len) +{ + if (ca.ops->print_stats == NULL) + return 0; + + return ca.ops->print_stats(ctx, buf, len); +} diff --git a/src/ipcpd/unicast/ca.h b/src/ipcpd/unicast/ca.h index 5cf6199c..7e3ab384 100644 --- a/src/ipcpd/unicast/ca.h +++ b/src/ipcpd/unicast/ca.h @@ -58,4 +58,8 @@ void ca_wnd_wait(ca_wnd_t wnd); uint8_t ca_calc_ecn(int fd, size_t len); +ssize_t ca_print_stats(void * ctx, + char * buf, + size_t len); + #endif /* OUROBOROS_IPCPD_UNICAST_CA_H */ diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c index 53accba3..cb069544 100644 --- a/src/ipcpd/unicast/dt.c +++ b/src/ipcpd/unicast/dt.c @@ -317,11 +317,10 @@ static int dt_stat_readdir(char *** buf) pthread_mutex_unlock(&dt.stat[i].lock); } + assert((size_t) idx == dt.n_flows); pthread_rwlock_unlock(&dt.lock); - assert((size_t) idx == dt.n_flows); - return idx; #else (void) buf; @@ -367,7 +366,6 @@ static struct rib_ops r_ops = { }; #ifdef IPCP_FLOW_STATS - static void stat_used(int fd, uint64_t addr) { diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c index b2eed7e5..2fc37d95 100644 --- a/src/ipcpd/unicast/fa.c +++ b/src/ipcpd/unicast/fa.c @@ -36,6 +36,7 @@ #include #include #include +#include #include "dir.h" #include "fa.h" @@ -44,6 +45,7 @@ #include "dt.h" #include "ca.h" +#include #include #include #include @@ -55,6 +57,8 @@ #define FLOW_UPDATE 2 #define MSGBUFSZ 2048 +#define STAT_FILE_LEN 0 + struct fa_msg { uint64_t s_addr; uint32_t r_eid; @@ -79,6 +83,9 @@ struct cmd { }; struct fa_flow { +#ifdef IPCP_FLOW_STATS + time_t stamp; +#endif int r_eid; /* remote endpoint id */ uint64_t r_addr; /* remote address */ void * ctx; /* congestion avoidance context */ @@ -87,7 +94,9 @@ struct fa_flow { struct { pthread_rwlock_t flows_lock; struct fa_flow flows[PROG_MAX_FLOWS]; - +#ifdef IPCP_FLOW_STATS + size_t n_flows; +#endif int fd; struct list_head cmds; @@ -98,6 +107,156 @@ struct { struct psched * psched; } fa; +static int fa_stat_read(const char * path, + char * buf, + size_t len) +{ +#ifdef IPCP_FLOW_STATS + struct fa_flow * flow; + int fd; + char r_addrstr[20]; + char tmstr[20]; + char castr[1024]; + struct tm * tm; + + fd = atoi(path); + + if (fd < 0 || fd > PROG_MAX_FLOWS) + return -1; + + if (len < 1536) + return 0; + + flow = &fa.flows[fd]; + + buf[0] = '\0'; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (flow->stamp ==0) { + pthread_rwlock_unlock(&fa.flows_lock); + return 0; + } + + sprintf(r_addrstr, "%" PRIu64, flow->r_addr); + + tm = localtime(&flow->stamp); + strftime(tmstr, sizeof(tmstr), "%F %T", tm); + + ca_print_stats(flow->ctx, castr, 1024); + + sprintf(buf, + "Flow established at: %20s\n" + "Remote address: %20s\n" + "Local endpoint ID: %20d\n" + "Remote endpoint ID: %20d\n" + "%s", + tmstr, r_addrstr, fd, flow->r_eid, + castr); + + pthread_rwlock_unlock(&fa.flows_lock); + + return strlen(buf); +#else + (void) path; + (void) buf; + (void) len; + return 0; +#endif +} + +static int fa_stat_readdir(char *** buf) +{ +#ifdef IPCP_FLOW_STATS + char entry[RIB_PATH_LEN + 1]; + size_t i; + int idx = 0; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (fa.n_flows < 1) { + pthread_rwlock_unlock(&fa.flows_lock); + return 0; + } + + *buf = malloc(sizeof(**buf) * fa.n_flows); + if (*buf == NULL) { + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; + } + + for (i = 0; i < PROG_MAX_FLOWS; ++i) { + struct fa_flow * flow; + + flow = &fa.flows[i]; + if (flow->stamp == 0) + continue; + + sprintf(entry, "%zu", i); + + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) { + while (idx-- > 0) + free((*buf)[idx]); + free(buf); + pthread_rwlock_unlock(&fa.flows_lock); + return -ENOMEM; + } + + strcpy((*buf)[idx++], entry); + } + + assert((size_t) idx == fa.n_flows); + + pthread_rwlock_unlock(&fa.flows_lock); + + return idx; +#else + (void) buf; + return 0; +#endif +} + +static int fa_stat_getattr(const char * path, + struct stat * st) +{ +#ifdef IPCP_FLOW_STATS + int fd; + struct fa_flow * flow; + + fd = atoi(path); + + st->st_mode = S_IFREG | 0755; + st->st_nlink = 1; + st->st_uid = getuid(); + st->st_gid = getgid(); + + flow = &fa.flows[fd]; + + pthread_rwlock_rdlock(&fa.flows_lock); + + if (flow->stamp != 0) { + st->st_size = 1536; + st->st_mtime = flow->stamp; + } else { + st->st_size = 0; + st->st_mtime = 0; + } + + pthread_rwlock_unlock(&fa.flows_lock); +#else + (void) path; + (void) st; +#endif + return 0; +} + +static struct rib_ops r_ops = { + .read = fa_stat_read, + .readdir = fa_stat_readdir, + .getattr = fa_stat_getattr +}; + static void packet_handler(int fd, qoscube_t qc, struct shm_du_buff * sdb) @@ -132,6 +291,9 @@ static void packet_handler(int fd, static int fa_flow_init(struct fa_flow * flow) { +#ifdef IPCP_FLOW_STATS + struct timespec now; +#endif memset(flow, 0, sizeof(*flow)); flow->r_eid = -1; @@ -141,6 +303,13 @@ static int fa_flow_init(struct fa_flow * flow) if (flow->ctx == NULL) return -1; +#ifdef IPCP_FLOW_STATS + clock_gettime(CLOCK_REALTIME_COARSE, &now); + + flow->stamp = now.tv_sec; + + ++fa.n_flows; +#endif return 0; } @@ -152,6 +321,10 @@ static void fa_flow_fini(struct fa_flow * flow) flow->r_eid = -1; flow->r_addr = INVALID_ADDR; + +#ifdef IPCP_FLOW_STATS + --fa.n_flows; +#endif } static void fa_post_packet(void * comp, @@ -336,6 +509,7 @@ static void * fa_handle_packet(void * o) int fa_init(void) { pthread_condattr_t cattr; + char fastr[256]; if (pthread_rwlock_init(&fa.flows_lock, NULL)) goto fail_rwlock; @@ -358,8 +532,14 @@ int fa_init(void) fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA); + sprintf(fastr, "%s", FA); + if (rib_reg(fastr, &r_ops)) + goto fail_rib_reg; + return 0; + fail_rib_reg: + pthread_cond_destroy(&fa.cond); fail_cond: pthread_condattr_destroy(&cattr); fail_cattr: @@ -586,6 +766,7 @@ static int fa_update_remote(int fd, struct shm_du_buff * sdb; qoscube_t qc = QOS_CUBE_BE; struct fa_flow * flow; + uint64_t r_addr; if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) { return -1; @@ -603,15 +784,16 @@ static int fa_update_remote(int fd, msg->r_eid = hton32(flow->r_eid); msg->ece = hton16(ece); - if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) { - pthread_rwlock_unlock(&fa.flows_lock); - ipcp_sdb_release(sdb); - return -1; - } + r_addr = flow->r_addr; pthread_rwlock_unlock(&fa.flows_lock); + if (dt_write_packet(r_addr, qc, fa.fd, sdb)) { + ipcp_sdb_release(sdb); + return -1; + } + return 0; } @@ -638,5 +820,4 @@ void fa_ecn_update(int eid, if (update) fa_update_remote(eid, ece); - } diff --git a/src/ipcpd/unicast/pol-ca-ops.h b/src/ipcpd/unicast/pol-ca-ops.h index 3cb8a9d2..69bd0d21 100644 --- a/src/ipcpd/unicast/pol-ca-ops.h +++ b/src/ipcpd/unicast/pol-ca-ops.h @@ -45,6 +45,12 @@ struct pol_ca_ops { uint8_t (* calc_ecn)(int fd, size_t len); + + /* Optional, can be NULL */ + ssize_t (* print_stats)(void * ctx, + char * buf, + size_t len); + }; #endif /* OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H */ diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.c b/src/ipcpd/unicast/pol/ca-mb-ecn.c index 03f7044d..0542e291 100644 --- a/src/ipcpd/unicast/pol/ca-mb-ecn.c +++ b/src/ipcpd/unicast/pol/ca-mb-ecn.c @@ -35,6 +35,7 @@ #include #include +#include /* congestion avoidance constants */ #define CA_SHFT 5 /* Average over 32 pkts */ @@ -72,7 +73,8 @@ struct pol_ca_ops mb_ecn_ca_ops = { .ctx_update_rcv = mb_ecn_ctx_update_rcv, .ctx_update_ece = mb_ecn_ctx_update_ece, .wnd_wait = mb_ecn_wnd_wait, - .calc_ecn = mb_ecn_calc_ecn + .calc_ecn = mb_ecn_calc_ecn, + .print_stats = mb_ecn_print_stats }; void * mb_ecn_ctx_create(void) @@ -229,3 +231,40 @@ uint8_t mb_ecn_calc_ecn(int fd, return (uint8_t) (q >> ECN_Q_SHFT); } + +ssize_t mb_ecn_print_stats(void * _ctx, + char * buf, + size_t len) +{ + struct mb_ecn_ctx* ctx = _ctx; + char * regime; + + if (len < 1024) + return 0; + + if (!ctx->tx_cav) + regime = "Slow start"; + else if (ctx->tx_ece) + regime = "Multiplicative dec"; + else + regime = "Additive inc"; + + sprintf(buf, + "Congestion avoidance algorithm: %20s\n" + "Upstream congestion level: %20u\n" + "Upstream packet counter: %20zu\n" + "Downstream congestion level: %20u\n" + "Downstream packet counter: %20zu\n" + "Congestion window size (ns): %20zu\n" + "Packets in this window: %20zu\n" + "Bytes in this window: %20zu\n" + "Max bytes in this window: %20zu\n" + "Current congestion regime: %20s\n", + "Multi-bit ECN", + ctx->rx_ece, ctx->rx_ctr, + ctx->tx_ece, ctx->tx_ctr, (size_t) (1 << ctx->tx_mul), + ctx->tx_wpc, ctx->tx_wbc, ctx->tx_wbl, + regime); + + return strlen(buf); +} diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.h b/src/ipcpd/unicast/pol/ca-mb-ecn.h index 456b9b13..945b0df5 100644 --- a/src/ipcpd/unicast/pol/ca-mb-ecn.h +++ b/src/ipcpd/unicast/pol/ca-mb-ecn.h @@ -45,6 +45,10 @@ void mb_ecn_wnd_wait(ca_wnd_t wnd); uint8_t mb_ecn_calc_ecn(int fd, size_t len); +ssize_t mb_ecn_print_stats(void * ctx, + char * buf, + size_t len); + extern struct pol_ca_ops mb_ecn_ca_ops; #endif /* OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H */ diff --git a/src/ipcpd/unicast/pol/ca-nop.c b/src/ipcpd/unicast/pol/ca-nop.c index d0d89a2e..15b253ca 100644 --- a/src/ipcpd/unicast/pol/ca-nop.c +++ b/src/ipcpd/unicast/pol/ca-nop.c @@ -31,7 +31,8 @@ struct pol_ca_ops nop_ca_ops = { .ctx_update_rcv = nop_ctx_update_rcv, .ctx_update_ece = nop_ctx_update_ece, .wnd_wait = nop_wnd_wait, - .calc_ecn = nop_calc_ecn + .calc_ecn = nop_calc_ecn, + .print_stats = NULL }; void * nop_ctx_create(void) -- cgit v1.2.3