diff options
| author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2026-05-01 18:32:17 +0200 |
|---|---|---|
| committer | Sander Vrijders <sander@ouroboros.rocks> | 2026-05-20 08:17:05 +0200 |
| commit | 6cd68ca0e61311cd4d6177b4c564432865a84a23 (patch) | |
| tree | 1d54e87e154d8200bb8ab15eb734f9c3ec437db6 | |
| parent | aa3901535516d49c7a881c9c06e3582e1a6f1ada (diff) | |
| download | ouroboros-6cd68ca0e61311cd4d6177b4c564432865a84a23.tar.gz ouroboros-6cd68ca0e61311cd4d6177b4c564432865a84a23.zip | |
ipcpd: Expose ipcpd-eth flow statistics via RIB
Adds an IPCP_ETH_FLOW_STATS cmake option (gated on HAVE_FUSE; default
off) exposing per-flow and aggregate frame counters at
/<ipcp>/eth/{summary,<fd>}.
Counters use RELAXED atomics; the macros expand to ((void) 0) when the
option is off. Per-flow and global counters live in nested stat
structs.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
| -rw-r--r-- | cmake/config/ipcp/common.cmake | 10 | ||||
| -rw-r--r-- | src/ipcpd/config.h.in | 1 | ||||
| -rw-r--r-- | src/ipcpd/eth/eth.c | 340 |
3 files changed, 343 insertions, 8 deletions
diff --git a/cmake/config/ipcp/common.cmake b/cmake/config/ipcp/common.cmake index ffd5dc32..0c873b76 100644 --- a/cmake/config/ipcp/common.cmake +++ b/cmake/config/ipcp/common.cmake @@ -41,3 +41,13 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux") set(IPCP_LINUX_TIMERSLACK_NS 100 CACHE STRING "Slack value for high resolution timers on Linux systems.") endif() + +# ipcpd-eth flow statistics (requires FUSE - eth.c relies on +# IPCP_ETH_FLOW_STATS implying HAVE_FUSE for rib_reg/rib_unreg) +if(HAVE_FUSE) + set(IPCP_ETH_FLOW_STATS FALSE CACHE BOOL + "Enable ipcpd-eth flow statistics via RIB") + if(IPCP_ETH_FLOW_STATS) + message(STATUS "ipcpd-eth flow statistics enabled") + endif() +endif() diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in index 0b4252e5..c0b6363e 100644 --- a/src/ipcpd/config.h.in +++ b/src/ipcpd/config.h.in @@ -51,6 +51,7 @@ #cmakedefine DISABLE_CORE_LOCK #cmakedefine BUILD_CONTAINER #cmakedefine IPCP_FLOW_STATS +#cmakedefine IPCP_ETH_FLOW_STATS #cmakedefine IPCP_DEBUG_LOCAL #ifdef CONFIG_OUROBOROS_DEBUG #cmakedefine DEBUG_PROTO_DHT diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c index 8293ac15..d7894cf6 100644 --- a/src/ipcpd/eth/eth.c +++ b/src/ipcpd/eth/eth.c @@ -51,6 +51,16 @@ #include <ouroboros/time.h> #include <ouroboros/fccntl.h> #include <ouroboros/pthread.h> +#include <ouroboros/rib.h> + +#ifdef IPCP_ETH_FLOW_STATS +#define LOAD_RELAXED(p) __atomic_load_n(p, __ATOMIC_RELAXED) +#define FETCH_ADD_RELAXED(p, v) __atomic_fetch_add(p, v, __ATOMIC_RELAXED) +#define FETCH_SUB_RELAXED(p, v) __atomic_fetch_sub(p, v, __ATOMIC_RELAXED) +#else +#define FETCH_ADD_RELAXED(p, v) ((void) 0) +#define FETCH_SUB_RELAXED(p, v) ((void) 0) +#endif #include "ipcp.h" #include "np1.h" @@ -144,6 +154,7 @@ #define NAME_QUERY_TIMEO 2000 /* ms */ #define MGMT_TIMEO 100 /* ms */ #define MGMT_FRAME_SIZE IPCP_ETH_MGMT_FRAME_SIZE +#define ETH_RIB_PATH "eth" #define FLOW_REQ 0 #define FLOW_REPLY 1 @@ -201,6 +212,17 @@ struct ef { int8_t r_sap; #endif uint8_t r_addr[MAC_SIZE]; +#ifdef IPCP_ETH_FLOW_STATS + struct { + time_t stamp; + size_t p_rcv; + size_t b_rcv; + size_t p_dlv_f; + size_t p_snd; + size_t b_snd; + size_t p_snd_f; + } stat; +#endif }; struct mgmt_frame { @@ -238,6 +260,21 @@ struct { struct ef * fd_to_ef; fset_t * np1_flows; pthread_rwlock_t flows_lock; +#ifdef IPCP_ETH_FLOW_STATS + struct { + size_t n_flows; + size_t n_rcv; + size_t n_snd; + size_t n_mgmt_rcv; + size_t n_mgmt_snd; + size_t n_bad_id; + size_t n_dlv_f; + size_t n_buf_f; + size_t n_rcv_f; + size_t kern_rcv; + size_t kern_drp; + } stat; +#endif pthread_t packet_writer[IPCP_ETH_WR_THR]; pthread_t packet_reader[IPCP_ETH_RD_THR]; @@ -289,7 +326,14 @@ static int eth_data_init(void) eth_data.fd_to_ef[i].r_sap = -1; #endif memset(ð_data.fd_to_ef[i].r_addr, 0, MAC_SIZE); +#ifdef IPCP_ETH_FLOW_STATS + memset(ð_data.fd_to_ef[i].stat, 0, + sizeof(eth_data.fd_to_ef[i].stat)); +#endif } +#ifdef IPCP_ETH_FLOW_STATS + memset(ð_data.stat, 0, sizeof(eth_data.stat)); +#endif eth_data.shim_data = shim_data_create(); if (eth_data.shim_data == NULL) @@ -362,6 +406,214 @@ static void eth_data_fini(void) free(eth_data.fd_to_ef); } +#ifdef IPCP_ETH_FLOW_STATS +static int eth_rib_read(const char * path, + char * buf, + size_t len) +{ + struct ef * flow; + int fd; + char tmstr[RIB_TM_STRLEN]; + struct tm * tm; + time_t stamp; + char * entry; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + if (len < 2048) + return 0; + + buf[0] = '\0'; + + if (strcmp(entry, "summary") == 0) { + int rcvbuf = 0; + int queued = 0; +#if defined(HAVE_RAW_SOCKETS) && defined(__linux__) + struct tpacket_stats tp_stats; + socklen_t tp_len = sizeof(tp_stats); +#endif +#if defined(HAVE_RAW_SOCKETS) + socklen_t optlen = sizeof(rcvbuf); + getsockopt(eth_data.s_fd, SOL_SOCKET, + SO_RCVBUF, &rcvbuf, &optlen); + ioctl(eth_data.s_fd, FIONREAD, &queued); +#endif +#if defined(HAVE_RAW_SOCKETS) && defined(__linux__) + if (getsockopt(eth_data.s_fd, SOL_PACKET, + PACKET_STATISTICS, + &tp_stats, &tp_len) == 0) { + FETCH_ADD_RELAXED(ð_data.stat.kern_rcv, + tp_stats.tp_packets); + FETCH_ADD_RELAXED(ð_data.stat.kern_drp, + tp_stats.tp_drops); + } +#endif + sprintf(buf, + "Active flows: %20zu\n" + "Total frames received: %20zu\n" + "Total frames sent: %20zu\n" + "Management frames received: %20zu\n" + "Management frames sent: %20zu\n" + "Bad EID/SAP frames: %20zu\n" + "Delivery (N+1) failures: %20zu\n" + "Buffer alloc failures: %20zu\n" + "Frame read failures: %20zu\n" + "Socket rcvbuf (bytes): %20d\n" + "Socket queued (bytes): %20d\n" + "Kernel frames received: %20zu\n" + "Kernel frames dropped: %20zu\n", + LOAD_RELAXED(ð_data.stat.n_flows), + LOAD_RELAXED(ð_data.stat.n_rcv), + LOAD_RELAXED(ð_data.stat.n_snd), + LOAD_RELAXED(ð_data.stat.n_mgmt_rcv), + LOAD_RELAXED(ð_data.stat.n_mgmt_snd), + LOAD_RELAXED(ð_data.stat.n_bad_id), + LOAD_RELAXED(ð_data.stat.n_dlv_f), + LOAD_RELAXED(ð_data.stat.n_buf_f), + LOAD_RELAXED(ð_data.stat.n_rcv_f), + rcvbuf, + queued, + LOAD_RELAXED(ð_data.stat.kern_rcv), + LOAD_RELAXED(ð_data.stat.kern_drp)); + + return strlen(buf); + } + + fd = atoi(entry); + + if (fd < 0 || fd >= SYS_MAX_FLOWS) + return -1; + + flow = ð_data.fd_to_ef[fd]; + + pthread_rwlock_rdlock(ð_data.flows_lock); + + stamp = flow->stat.stamp; + if (stamp == 0) { + pthread_rwlock_unlock(ð_data.flows_lock); + return 0; + } + + pthread_rwlock_unlock(ð_data.flows_lock); + + tm = gmtime(&stamp); + strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm); + + sprintf(buf, + "Flow established at: %20s\n" + "Sent (packets): %20zu\n" + "Sent (bytes): %20zu\n" + "Send failed (packets): %20zu\n" + "Received (packets): %20zu\n" + "Received (bytes): %20zu\n" + "Delivery (N+1) failures: %20zu\n", + tmstr, + LOAD_RELAXED(&flow->stat.p_snd), + LOAD_RELAXED(&flow->stat.b_snd), + LOAD_RELAXED(&flow->stat.p_snd_f), + LOAD_RELAXED(&flow->stat.p_rcv), + LOAD_RELAXED(&flow->stat.b_rcv), + LOAD_RELAXED(&flow->stat.p_dlv_f)); + + return strlen(buf); +} + +static int eth_rib_readdir(char *** buf) +{ + char entry[RIB_PATH_LEN + 1]; + size_t i; + int idx = 0; + int n_entries; + + pthread_rwlock_rdlock(ð_data.flows_lock); + + n_entries = (int) LOAD_RELAXED(ð_data.stat.n_flows) + 1; + + *buf = malloc(sizeof(**buf) * n_entries); + if (*buf == NULL) + goto fail_entries; + + (*buf)[idx] = malloc(strlen("summary") + 1); + if ((*buf)[idx] == NULL) + goto fail_entry; + + strcpy((*buf)[idx++], "summary"); + + for (i = 0; i < SYS_MAX_FLOWS && idx < n_entries; ++i) { + if (eth_data.fd_to_ef[i].stat.stamp == 0) + continue; + + sprintf(entry, "%zu", i); + + (*buf)[idx] = malloc(strlen(entry) + 1); + if ((*buf)[idx] == NULL) + goto fail_entry; + + strcpy((*buf)[idx++], entry); + } + + pthread_rwlock_unlock(ð_data.flows_lock); + + return idx; + + fail_entry: + while (idx-- > 0) + free((*buf)[idx]); + free(*buf); + fail_entries: + pthread_rwlock_unlock(ð_data.flows_lock); + return -ENOMEM; +} + +static int eth_rib_getattr(const char * path, + struct rib_attr * attr) +{ + int fd; + char * entry; + struct ef * flow; + + entry = strstr(path, RIB_SEPARATOR) + 1; + assert(entry); + + if (strcmp(entry, "summary") == 0) { + attr->size = 2048; + attr->mtime = 0; + return 0; + } + + fd = atoi(entry); + + if (fd < 0 || fd >= SYS_MAX_FLOWS) { + attr->size = 0; + attr->mtime = 0; + return 0; + } + + flow = ð_data.fd_to_ef[fd]; + + pthread_rwlock_rdlock(ð_data.flows_lock); + + if (flow->stat.stamp != 0) { + attr->size = 2048; + attr->mtime = flow->stat.stamp; + } else { + attr->size = 0; + attr->mtime = 0; + } + + pthread_rwlock_unlock(ð_data.flows_lock); + + return 0; +} + +static struct rib_ops eth_r_ops = { + .read = eth_rib_read, + .readdir = eth_rib_readdir, + .getattr = eth_rib_getattr +}; +#endif /* IPCP_ETH_FLOW_STATS */ + #ifdef BUILD_ETH_LLC static uint8_t reverse_bits(uint8_t b) { @@ -451,10 +703,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, } assert(FD_ISSET(eth_data.s_fd, &fds)); - if (sendto(eth_data.s_fd, - frame, - frame_len, - 0, + if (sendto(eth_data.s_fd, frame, frame_len, 0, (struct sockaddr *) ð_data.device, sizeof(eth_data.device)) <= 0) { log_dbg("Failed to send message: %s.", strerror(errno)); @@ -462,6 +711,8 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr, } #endif /* HAVE_NETMAP */ + FETCH_ADD_RELAXED(ð_data.stat.n_snd, 1); + return 0; } @@ -519,6 +770,9 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr, buf, len + data->len); free(buf); + if (ret == 0) + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_snd, 1); + return ret; } @@ -569,6 +823,8 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr, return -1; } + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_snd, 1); + free(buf); return 0; @@ -633,7 +889,7 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr, fd = eth_data.ef_to_fd[dsap]; #endif if (fd < 0) { - pthread_rwlock_unlock(& eth_data.flows_lock); + pthread_rwlock_unlock(ð_data.flows_lock); log_err("No flow found with that SAP."); return -1; /* -EFLOWNOTFOUND */ } @@ -700,6 +956,8 @@ static int eth_ipcp_name_query_req(const uint8_t * hash, return -1; } + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_snd, 1); + free(buf); } @@ -900,12 +1158,15 @@ static void * eth_ipcp_packet_reader(void * o) if (select(eth_data.s_fd + 1, &fds, NULL, NULL, NULL) < 0) continue; assert(FD_ISSET(eth_data.s_fd, &fds)); - if (ipcp_spb_reserve(&spb, ETH_MTU)) + if (ipcp_spb_reserve(&spb, ETH_MTU)) { + FETCH_ADD_RELAXED(ð_data.stat.n_buf_f, 1); continue; + } buf = ssm_pk_buff_head_alloc(spb, ETH_HEADER_TOT_SIZE); if (buf == NULL) { log_dbg("Failed to allocate header."); ipcp_spb_release(spb); + FETCH_ADD_RELAXED(ð_data.stat.n_buf_f, 1); continue; } frame_len = recv(eth_data.s_fd, buf, @@ -914,6 +1175,7 @@ static void * eth_ipcp_packet_reader(void * o) if (frame_len <= 0) { log_dbg("Failed to receive frame."); ipcp_spb_release(spb); + FETCH_ADD_RELAXED(ð_data.stat.n_rcv_f, 1); continue; } #endif @@ -1008,6 +1270,8 @@ static void * eth_ipcp_packet_reader(void * o) list_add(&frame->next, ð_data.mgmt_frames); pthread_cond_signal(ð_data.mgmt_cond); pthread_mutex_unlock(ð_data.mgmt_lock); + FETCH_ADD_RELAXED(ð_data.stat.n_rcv, 1); + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_rcv, 1); } else { pthread_rwlock_rdlock(ð_data.flows_lock); @@ -1018,6 +1282,7 @@ static void * eth_ipcp_packet_reader(void * o) #endif if (fd < 0) { pthread_rwlock_unlock(ð_data.flows_lock); + FETCH_ADD_RELAXED(ð_data.stat.n_bad_id, 1); goto fail_frame; } @@ -1026,9 +1291,14 @@ static void * eth_ipcp_packet_reader(void * o) || memcmp(eth_data.fd_to_ef[fd].r_addr, e_frame->src_hwaddr, MAC_SIZE)) { pthread_rwlock_unlock(ð_data.flows_lock); + FETCH_ADD_RELAXED(ð_data.stat.n_bad_id, 1); goto fail_frame; } #endif + FETCH_ADD_RELAXED(ð_data.fd_to_ef[fd].stat.p_rcv, 1); + FETCH_ADD_RELAXED(ð_data.fd_to_ef[fd].stat.b_rcv, + length); + FETCH_ADD_RELAXED(ð_data.stat.n_rcv, 1); pthread_rwlock_unlock(ð_data.flows_lock); #ifndef HAVE_NETMAP @@ -1041,8 +1311,13 @@ static void * eth_ipcp_packet_reader(void * o) buf = ssm_pk_buff_head(spb); memcpy(buf, &e_frame->payload, length); #endif - if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0) + if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0) { ipcp_spb_release(spb); + FETCH_ADD_RELAXED( + ð_data.fd_to_ef[fd].stat.p_dlv_f, + 1); + FETCH_ADD_RELAXED(ð_data.stat.n_dlv_f, 1); + } continue; fail_frame: @@ -1102,6 +1377,7 @@ static void * eth_ipcp_packet_writer(void * o) == NULL) { log_dbg("Failed to allocate header."); ipcp_spb_release(spb); + FETCH_ADD_RELAXED(ð_data.stat.n_buf_f, 1); continue; } @@ -1125,8 +1401,19 @@ static void * eth_ipcp_packet_writer(void * o) dsap, ssap, #endif ssm_pk_buff_head(spb), - len)) + len)) { log_dbg("Failed to send frame."); + FETCH_ADD_RELAXED( + ð_data.fd_to_ef[fd].stat.p_snd_f, + 1); + } else { + FETCH_ADD_RELAXED( + ð_data.fd_to_ef[fd].stat.p_snd, + 1); + FETCH_ADD_RELAXED( + ð_data.fd_to_ef[fd].stat.b_snd, + len); + } ipcp_spb_release(spb); } } @@ -1593,6 +1880,12 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf) return -1; } #endif /* HAVE_NETMAP */ +#ifdef IPCP_ETH_FLOW_STATS + if (rib_reg(ETH_RIB_PATH, ð_r_ops)) { + log_err("Failed to register RIB."); + goto fail_rib_reg; + } +#endif #if defined(__linux__) if (pthread_create(ð_data.if_monitor, NULL, eth_ipcp_if_monitor, NULL)) { @@ -1656,6 +1949,10 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf) #if defined(__linux__) fail_monitor: #endif +#ifdef IPCP_ETH_FLOW_STATS + rib_unreg(ETH_RIB_PATH); + fail_rib_reg: +#endif #if defined(HAVE_NETMAP) nm_close(eth_data.nmd); #elif defined(HAVE_BPF) @@ -1732,6 +2029,8 @@ static int eth_ipcp_query(const uint8_t * hash) return -1; } + FETCH_ADD_RELAXED(ð_data.stat.n_mgmt_snd, 1); + free(buf); ret = shim_data_dir_query_wait(query, &timeout); @@ -1798,6 +2097,14 @@ static int eth_ipcp_flow_alloc(int fd, } fset_add(eth_data.np1_flows, fd); +#ifdef IPCP_ETH_FLOW_STATS + pthread_rwlock_wrlock(ð_data.flows_lock); + memset(ð_data.fd_to_ef[fd].stat, 0, + sizeof(eth_data.fd_to_ef[fd].stat)); + eth_data.fd_to_ef[fd].stat.stamp = time(NULL); + FETCH_ADD_RELAXED(ð_data.stat.n_flows, 1); + pthread_rwlock_unlock(ð_data.flows_lock); +#endif #if defined(BUILD_ETH_LLC) log_dbg("Assigned SAP %d for fd %d.", ssap, fd); #endif @@ -1858,6 +2165,14 @@ static int eth_ipcp_flow_alloc_resp(int fd, } fset_add(eth_data.np1_flows, fd); +#ifdef IPCP_ETH_FLOW_STATS + pthread_rwlock_wrlock(ð_data.flows_lock); + memset(ð_data.fd_to_ef[fd].stat, 0, + sizeof(eth_data.fd_to_ef[fd].stat)); + eth_data.fd_to_ef[fd].stat.stamp = time(NULL); + FETCH_ADD_RELAXED(ð_data.stat.n_flows, 1); + pthread_rwlock_unlock(ð_data.flows_lock); +#endif #if defined(BUILD_ETH_LLC) log_dbg("Assigned SAP %d for fd %d.", ssap, fd); #endif @@ -1886,6 +2201,12 @@ static int eth_ipcp_flow_dealloc(int fd) #endif memset(ð_data.fd_to_ef[fd].r_addr, 0, MAC_SIZE); +#ifdef IPCP_ETH_FLOW_STATS + memset(ð_data.fd_to_ef[fd].stat, 0, + sizeof(eth_data.fd_to_ef[fd].stat)); + FETCH_SUB_RELAXED(ð_data.stat.n_flows, 1); +#endif + pthread_rwlock_unlock(ð_data.flows_lock); ipcp_flow_dealloc(fd); @@ -1952,6 +2273,9 @@ int main(int argc, #ifdef __linux__ pthread_join(eth_data.if_monitor, NULL); #endif +#ifdef IPCP_ETH_FLOW_STATS + rib_unreg(ETH_RIB_PATH); +#endif } ipcp_stop(); |
