summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2026-05-01 18:32:17 +0200
committerSander Vrijders <sander@ouroboros.rocks>2026-05-20 08:17:05 +0200
commit6cd68ca0e61311cd4d6177b4c564432865a84a23 (patch)
tree1d54e87e154d8200bb8ab15eb734f9c3ec437db6
parentaa3901535516d49c7a881c9c06e3582e1a6f1ada (diff)
downloadouroboros-6cd68ca0e61311cd4d6177b4c564432865a84a23.tar.gz
ouroboros-6cd68ca0e61311cd4d6177b4c564432865a84a23.zip
ipcpd: Expose ipcpd-eth flow statistics via RIB
Adds an IPCP_ETH_FLOW_STATS cmake option (gated on HAVE_FUSE; default off) exposing per-flow and aggregate frame counters at /<ipcp>/eth/{summary,<fd>}. Counters use RELAXED atomics; the macros expand to ((void) 0) when the option is off. Per-flow and global counters live in nested stat structs. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks> Signed-off-by: Sander Vrijders <sander@ouroboros.rocks>
-rw-r--r--cmake/config/ipcp/common.cmake10
-rw-r--r--src/ipcpd/config.h.in1
-rw-r--r--src/ipcpd/eth/eth.c340
3 files changed, 343 insertions, 8 deletions
diff --git a/cmake/config/ipcp/common.cmake b/cmake/config/ipcp/common.cmake
index ffd5dc32..0c873b76 100644
--- a/cmake/config/ipcp/common.cmake
+++ b/cmake/config/ipcp/common.cmake
@@ -41,3 +41,13 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
set(IPCP_LINUX_TIMERSLACK_NS 100 CACHE STRING
"Slack value for high resolution timers on Linux systems.")
endif()
+
+# ipcpd-eth flow statistics (requires FUSE - eth.c relies on
+# IPCP_ETH_FLOW_STATS implying HAVE_FUSE for rib_reg/rib_unreg)
+if(HAVE_FUSE)
+ set(IPCP_ETH_FLOW_STATS FALSE CACHE BOOL
+ "Enable ipcpd-eth flow statistics via RIB")
+ if(IPCP_ETH_FLOW_STATS)
+ message(STATUS "ipcpd-eth flow statistics enabled")
+ endif()
+endif()
diff --git a/src/ipcpd/config.h.in b/src/ipcpd/config.h.in
index 0b4252e5..c0b6363e 100644
--- a/src/ipcpd/config.h.in
+++ b/src/ipcpd/config.h.in
@@ -51,6 +51,7 @@
#cmakedefine DISABLE_CORE_LOCK
#cmakedefine BUILD_CONTAINER
#cmakedefine IPCP_FLOW_STATS
+#cmakedefine IPCP_ETH_FLOW_STATS
#cmakedefine IPCP_DEBUG_LOCAL
#ifdef CONFIG_OUROBOROS_DEBUG
#cmakedefine DEBUG_PROTO_DHT
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index 8293ac15..d7894cf6 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -51,6 +51,16 @@
#include <ouroboros/time.h>
#include <ouroboros/fccntl.h>
#include <ouroboros/pthread.h>
+#include <ouroboros/rib.h>
+
+#ifdef IPCP_ETH_FLOW_STATS
+#define LOAD_RELAXED(p) __atomic_load_n(p, __ATOMIC_RELAXED)
+#define FETCH_ADD_RELAXED(p, v) __atomic_fetch_add(p, v, __ATOMIC_RELAXED)
+#define FETCH_SUB_RELAXED(p, v) __atomic_fetch_sub(p, v, __ATOMIC_RELAXED)
+#else
+#define FETCH_ADD_RELAXED(p, v) ((void) 0)
+#define FETCH_SUB_RELAXED(p, v) ((void) 0)
+#endif
#include "ipcp.h"
#include "np1.h"
@@ -144,6 +154,7 @@
#define NAME_QUERY_TIMEO 2000 /* ms */
#define MGMT_TIMEO 100 /* ms */
#define MGMT_FRAME_SIZE IPCP_ETH_MGMT_FRAME_SIZE
+#define ETH_RIB_PATH "eth"
#define FLOW_REQ 0
#define FLOW_REPLY 1
@@ -201,6 +212,17 @@ struct ef {
int8_t r_sap;
#endif
uint8_t r_addr[MAC_SIZE];
+#ifdef IPCP_ETH_FLOW_STATS
+ struct {
+ time_t stamp;
+ size_t p_rcv;
+ size_t b_rcv;
+ size_t p_dlv_f;
+ size_t p_snd;
+ size_t b_snd;
+ size_t p_snd_f;
+ } stat;
+#endif
};
struct mgmt_frame {
@@ -238,6 +260,21 @@ struct {
struct ef * fd_to_ef;
fset_t * np1_flows;
pthread_rwlock_t flows_lock;
+#ifdef IPCP_ETH_FLOW_STATS
+ struct {
+ size_t n_flows;
+ size_t n_rcv;
+ size_t n_snd;
+ size_t n_mgmt_rcv;
+ size_t n_mgmt_snd;
+ size_t n_bad_id;
+ size_t n_dlv_f;
+ size_t n_buf_f;
+ size_t n_rcv_f;
+ size_t kern_rcv;
+ size_t kern_drp;
+ } stat;
+#endif
pthread_t packet_writer[IPCP_ETH_WR_THR];
pthread_t packet_reader[IPCP_ETH_RD_THR];
@@ -289,7 +326,14 @@ static int eth_data_init(void)
eth_data.fd_to_ef[i].r_sap = -1;
#endif
memset(&eth_data.fd_to_ef[i].r_addr, 0, MAC_SIZE);
+#ifdef IPCP_ETH_FLOW_STATS
+ memset(&eth_data.fd_to_ef[i].stat, 0,
+ sizeof(eth_data.fd_to_ef[i].stat));
+#endif
}
+#ifdef IPCP_ETH_FLOW_STATS
+ memset(&eth_data.stat, 0, sizeof(eth_data.stat));
+#endif
eth_data.shim_data = shim_data_create();
if (eth_data.shim_data == NULL)
@@ -362,6 +406,214 @@ static void eth_data_fini(void)
free(eth_data.fd_to_ef);
}
+#ifdef IPCP_ETH_FLOW_STATS
+static int eth_rib_read(const char * path,
+ char * buf,
+ size_t len)
+{
+ struct ef * flow;
+ int fd;
+ char tmstr[RIB_TM_STRLEN];
+ struct tm * tm;
+ time_t stamp;
+ char * entry;
+
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+ assert(entry);
+
+ if (len < 2048)
+ return 0;
+
+ buf[0] = '\0';
+
+ if (strcmp(entry, "summary") == 0) {
+ int rcvbuf = 0;
+ int queued = 0;
+#if defined(HAVE_RAW_SOCKETS) && defined(__linux__)
+ struct tpacket_stats tp_stats;
+ socklen_t tp_len = sizeof(tp_stats);
+#endif
+#if defined(HAVE_RAW_SOCKETS)
+ socklen_t optlen = sizeof(rcvbuf);
+ getsockopt(eth_data.s_fd, SOL_SOCKET,
+ SO_RCVBUF, &rcvbuf, &optlen);
+ ioctl(eth_data.s_fd, FIONREAD, &queued);
+#endif
+#if defined(HAVE_RAW_SOCKETS) && defined(__linux__)
+ if (getsockopt(eth_data.s_fd, SOL_PACKET,
+ PACKET_STATISTICS,
+ &tp_stats, &tp_len) == 0) {
+ FETCH_ADD_RELAXED(&eth_data.stat.kern_rcv,
+ tp_stats.tp_packets);
+ FETCH_ADD_RELAXED(&eth_data.stat.kern_drp,
+ tp_stats.tp_drops);
+ }
+#endif
+ sprintf(buf,
+ "Active flows: %20zu\n"
+ "Total frames received: %20zu\n"
+ "Total frames sent: %20zu\n"
+ "Management frames received: %20zu\n"
+ "Management frames sent: %20zu\n"
+ "Bad EID/SAP frames: %20zu\n"
+ "Delivery (N+1) failures: %20zu\n"
+ "Buffer alloc failures: %20zu\n"
+ "Frame read failures: %20zu\n"
+ "Socket rcvbuf (bytes): %20d\n"
+ "Socket queued (bytes): %20d\n"
+ "Kernel frames received: %20zu\n"
+ "Kernel frames dropped: %20zu\n",
+ LOAD_RELAXED(&eth_data.stat.n_flows),
+ LOAD_RELAXED(&eth_data.stat.n_rcv),
+ LOAD_RELAXED(&eth_data.stat.n_snd),
+ LOAD_RELAXED(&eth_data.stat.n_mgmt_rcv),
+ LOAD_RELAXED(&eth_data.stat.n_mgmt_snd),
+ LOAD_RELAXED(&eth_data.stat.n_bad_id),
+ LOAD_RELAXED(&eth_data.stat.n_dlv_f),
+ LOAD_RELAXED(&eth_data.stat.n_buf_f),
+ LOAD_RELAXED(&eth_data.stat.n_rcv_f),
+ rcvbuf,
+ queued,
+ LOAD_RELAXED(&eth_data.stat.kern_rcv),
+ LOAD_RELAXED(&eth_data.stat.kern_drp));
+
+ return strlen(buf);
+ }
+
+ fd = atoi(entry);
+
+ if (fd < 0 || fd >= SYS_MAX_FLOWS)
+ return -1;
+
+ flow = &eth_data.fd_to_ef[fd];
+
+ pthread_rwlock_rdlock(&eth_data.flows_lock);
+
+ stamp = flow->stat.stamp;
+ if (stamp == 0) {
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+ return 0;
+ }
+
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+
+ tm = gmtime(&stamp);
+ strftime(tmstr, sizeof(tmstr), RIB_TM_FORMAT, tm);
+
+ sprintf(buf,
+ "Flow established at: %20s\n"
+ "Sent (packets): %20zu\n"
+ "Sent (bytes): %20zu\n"
+ "Send failed (packets): %20zu\n"
+ "Received (packets): %20zu\n"
+ "Received (bytes): %20zu\n"
+ "Delivery (N+1) failures: %20zu\n",
+ tmstr,
+ LOAD_RELAXED(&flow->stat.p_snd),
+ LOAD_RELAXED(&flow->stat.b_snd),
+ LOAD_RELAXED(&flow->stat.p_snd_f),
+ LOAD_RELAXED(&flow->stat.p_rcv),
+ LOAD_RELAXED(&flow->stat.b_rcv),
+ LOAD_RELAXED(&flow->stat.p_dlv_f));
+
+ return strlen(buf);
+}
+
+static int eth_rib_readdir(char *** buf)
+{
+ char entry[RIB_PATH_LEN + 1];
+ size_t i;
+ int idx = 0;
+ int n_entries;
+
+ pthread_rwlock_rdlock(&eth_data.flows_lock);
+
+ n_entries = (int) LOAD_RELAXED(&eth_data.stat.n_flows) + 1;
+
+ *buf = malloc(sizeof(**buf) * n_entries);
+ if (*buf == NULL)
+ goto fail_entries;
+
+ (*buf)[idx] = malloc(strlen("summary") + 1);
+ if ((*buf)[idx] == NULL)
+ goto fail_entry;
+
+ strcpy((*buf)[idx++], "summary");
+
+ for (i = 0; i < SYS_MAX_FLOWS && idx < n_entries; ++i) {
+ if (eth_data.fd_to_ef[i].stat.stamp == 0)
+ continue;
+
+ sprintf(entry, "%zu", i);
+
+ (*buf)[idx] = malloc(strlen(entry) + 1);
+ if ((*buf)[idx] == NULL)
+ goto fail_entry;
+
+ strcpy((*buf)[idx++], entry);
+ }
+
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+
+ return idx;
+
+ fail_entry:
+ while (idx-- > 0)
+ free((*buf)[idx]);
+ free(*buf);
+ fail_entries:
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+ return -ENOMEM;
+}
+
+static int eth_rib_getattr(const char * path,
+ struct rib_attr * attr)
+{
+ int fd;
+ char * entry;
+ struct ef * flow;
+
+ entry = strstr(path, RIB_SEPARATOR) + 1;
+ assert(entry);
+
+ if (strcmp(entry, "summary") == 0) {
+ attr->size = 2048;
+ attr->mtime = 0;
+ return 0;
+ }
+
+ fd = atoi(entry);
+
+ if (fd < 0 || fd >= SYS_MAX_FLOWS) {
+ attr->size = 0;
+ attr->mtime = 0;
+ return 0;
+ }
+
+ flow = &eth_data.fd_to_ef[fd];
+
+ pthread_rwlock_rdlock(&eth_data.flows_lock);
+
+ if (flow->stat.stamp != 0) {
+ attr->size = 2048;
+ attr->mtime = flow->stat.stamp;
+ } else {
+ attr->size = 0;
+ attr->mtime = 0;
+ }
+
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+
+ return 0;
+}
+
+static struct rib_ops eth_r_ops = {
+ .read = eth_rib_read,
+ .readdir = eth_rib_readdir,
+ .getattr = eth_rib_getattr
+};
+#endif /* IPCP_ETH_FLOW_STATS */
+
#ifdef BUILD_ETH_LLC
static uint8_t reverse_bits(uint8_t b)
{
@@ -451,10 +703,7 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr,
}
assert(FD_ISSET(eth_data.s_fd, &fds));
- if (sendto(eth_data.s_fd,
- frame,
- frame_len,
- 0,
+ if (sendto(eth_data.s_fd, frame, frame_len, 0,
(struct sockaddr *) &eth_data.device,
sizeof(eth_data.device)) <= 0) {
log_dbg("Failed to send message: %s.", strerror(errno));
@@ -462,6 +711,8 @@ static int eth_ipcp_send_frame(const uint8_t * dst_addr,
}
#endif /* HAVE_NETMAP */
+ FETCH_ADD_RELAXED(&eth_data.stat.n_snd, 1);
+
return 0;
}
@@ -519,6 +770,9 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
buf, len + data->len);
free(buf);
+ if (ret == 0)
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_snd, 1);
+
return ret;
}
@@ -569,6 +823,8 @@ static int eth_ipcp_alloc_resp(uint8_t * dst_addr,
return -1;
}
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_snd, 1);
+
free(buf);
return 0;
@@ -633,7 +889,7 @@ static int eth_ipcp_alloc_reply(uint8_t * r_addr,
fd = eth_data.ef_to_fd[dsap];
#endif
if (fd < 0) {
- pthread_rwlock_unlock(& eth_data.flows_lock);
+ pthread_rwlock_unlock(&eth_data.flows_lock);
log_err("No flow found with that SAP.");
return -1; /* -EFLOWNOTFOUND */
}
@@ -700,6 +956,8 @@ static int eth_ipcp_name_query_req(const uint8_t * hash,
return -1;
}
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_snd, 1);
+
free(buf);
}
@@ -900,12 +1158,15 @@ static void * eth_ipcp_packet_reader(void * o)
if (select(eth_data.s_fd + 1, &fds, NULL, NULL, NULL) < 0)
continue;
assert(FD_ISSET(eth_data.s_fd, &fds));
- if (ipcp_spb_reserve(&spb, ETH_MTU))
+ if (ipcp_spb_reserve(&spb, ETH_MTU)) {
+ FETCH_ADD_RELAXED(&eth_data.stat.n_buf_f, 1);
continue;
+ }
buf = ssm_pk_buff_head_alloc(spb, ETH_HEADER_TOT_SIZE);
if (buf == NULL) {
log_dbg("Failed to allocate header.");
ipcp_spb_release(spb);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_buf_f, 1);
continue;
}
frame_len = recv(eth_data.s_fd, buf,
@@ -914,6 +1175,7 @@ static void * eth_ipcp_packet_reader(void * o)
if (frame_len <= 0) {
log_dbg("Failed to receive frame.");
ipcp_spb_release(spb);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_rcv_f, 1);
continue;
}
#endif
@@ -1008,6 +1270,8 @@ static void * eth_ipcp_packet_reader(void * o)
list_add(&frame->next, &eth_data.mgmt_frames);
pthread_cond_signal(&eth_data.mgmt_cond);
pthread_mutex_unlock(&eth_data.mgmt_lock);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_rcv, 1);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_rcv, 1);
} else {
pthread_rwlock_rdlock(&eth_data.flows_lock);
@@ -1018,6 +1282,7 @@ static void * eth_ipcp_packet_reader(void * o)
#endif
if (fd < 0) {
pthread_rwlock_unlock(&eth_data.flows_lock);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_bad_id, 1);
goto fail_frame;
}
@@ -1026,9 +1291,14 @@ static void * eth_ipcp_packet_reader(void * o)
|| memcmp(eth_data.fd_to_ef[fd].r_addr,
e_frame->src_hwaddr, MAC_SIZE)) {
pthread_rwlock_unlock(&eth_data.flows_lock);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_bad_id, 1);
goto fail_frame;
}
#endif
+ FETCH_ADD_RELAXED(&eth_data.fd_to_ef[fd].stat.p_rcv, 1);
+ FETCH_ADD_RELAXED(&eth_data.fd_to_ef[fd].stat.b_rcv,
+ length);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_rcv, 1);
pthread_rwlock_unlock(&eth_data.flows_lock);
#ifndef HAVE_NETMAP
@@ -1041,8 +1311,13 @@ static void * eth_ipcp_packet_reader(void * o)
buf = ssm_pk_buff_head(spb);
memcpy(buf, &e_frame->payload, length);
#endif
- if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0)
+ if (np1_flow_write(fd, spb, NP1_GET_POOL(fd)) < 0) {
ipcp_spb_release(spb);
+ FETCH_ADD_RELAXED(
+ &eth_data.fd_to_ef[fd].stat.p_dlv_f,
+ 1);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_dlv_f, 1);
+ }
continue;
fail_frame:
@@ -1102,6 +1377,7 @@ static void * eth_ipcp_packet_writer(void * o)
== NULL) {
log_dbg("Failed to allocate header.");
ipcp_spb_release(spb);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_buf_f, 1);
continue;
}
@@ -1125,8 +1401,19 @@ static void * eth_ipcp_packet_writer(void * o)
dsap, ssap,
#endif
ssm_pk_buff_head(spb),
- len))
+ len)) {
log_dbg("Failed to send frame.");
+ FETCH_ADD_RELAXED(
+ &eth_data.fd_to_ef[fd].stat.p_snd_f,
+ 1);
+ } else {
+ FETCH_ADD_RELAXED(
+ &eth_data.fd_to_ef[fd].stat.p_snd,
+ 1);
+ FETCH_ADD_RELAXED(
+ &eth_data.fd_to_ef[fd].stat.b_snd,
+ len);
+ }
ipcp_spb_release(spb);
}
}
@@ -1593,6 +1880,12 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf)
return -1;
}
#endif /* HAVE_NETMAP */
+#ifdef IPCP_ETH_FLOW_STATS
+ if (rib_reg(ETH_RIB_PATH, &eth_r_ops)) {
+ log_err("Failed to register RIB.");
+ goto fail_rib_reg;
+ }
+#endif
#if defined(__linux__)
if (pthread_create(&eth_data.if_monitor, NULL,
eth_ipcp_if_monitor, NULL)) {
@@ -1656,6 +1949,10 @@ static int eth_ipcp_bootstrap(struct ipcp_config * conf)
#if defined(__linux__)
fail_monitor:
#endif
+#ifdef IPCP_ETH_FLOW_STATS
+ rib_unreg(ETH_RIB_PATH);
+ fail_rib_reg:
+#endif
#if defined(HAVE_NETMAP)
nm_close(eth_data.nmd);
#elif defined(HAVE_BPF)
@@ -1732,6 +2029,8 @@ static int eth_ipcp_query(const uint8_t * hash)
return -1;
}
+ FETCH_ADD_RELAXED(&eth_data.stat.n_mgmt_snd, 1);
+
free(buf);
ret = shim_data_dir_query_wait(query, &timeout);
@@ -1798,6 +2097,14 @@ static int eth_ipcp_flow_alloc(int fd,
}
fset_add(eth_data.np1_flows, fd);
+#ifdef IPCP_ETH_FLOW_STATS
+ pthread_rwlock_wrlock(&eth_data.flows_lock);
+ memset(&eth_data.fd_to_ef[fd].stat, 0,
+ sizeof(eth_data.fd_to_ef[fd].stat));
+ eth_data.fd_to_ef[fd].stat.stamp = time(NULL);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_flows, 1);
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+#endif
#if defined(BUILD_ETH_LLC)
log_dbg("Assigned SAP %d for fd %d.", ssap, fd);
#endif
@@ -1858,6 +2165,14 @@ static int eth_ipcp_flow_alloc_resp(int fd,
}
fset_add(eth_data.np1_flows, fd);
+#ifdef IPCP_ETH_FLOW_STATS
+ pthread_rwlock_wrlock(&eth_data.flows_lock);
+ memset(&eth_data.fd_to_ef[fd].stat, 0,
+ sizeof(eth_data.fd_to_ef[fd].stat));
+ eth_data.fd_to_ef[fd].stat.stamp = time(NULL);
+ FETCH_ADD_RELAXED(&eth_data.stat.n_flows, 1);
+ pthread_rwlock_unlock(&eth_data.flows_lock);
+#endif
#if defined(BUILD_ETH_LLC)
log_dbg("Assigned SAP %d for fd %d.", ssap, fd);
#endif
@@ -1886,6 +2201,12 @@ static int eth_ipcp_flow_dealloc(int fd)
#endif
memset(&eth_data.fd_to_ef[fd].r_addr, 0, MAC_SIZE);
+#ifdef IPCP_ETH_FLOW_STATS
+ memset(&eth_data.fd_to_ef[fd].stat, 0,
+ sizeof(eth_data.fd_to_ef[fd].stat));
+ FETCH_SUB_RELAXED(&eth_data.stat.n_flows, 1);
+#endif
+
pthread_rwlock_unlock(&eth_data.flows_lock);
ipcp_flow_dealloc(fd);
@@ -1952,6 +2273,9 @@ int main(int argc,
#ifdef __linux__
pthread_join(eth_data.if_monitor, NULL);
#endif
+#ifdef IPCP_ETH_FLOW_STATS
+ rib_unreg(ETH_RIB_PATH);
+#endif
}
ipcp_stop();