diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2025-08-24 12:19:46 +0200 |
---|---|---|
committer | Dimitri Staessens <dimitri@ouroboros.rocks> | 2025-08-24 12:19:46 +0200 |
commit | 4afcd8732cbb33da63f665e7f3e0e4e9fe524e1e (patch) | |
tree | e4e25d9d53c4ba07808a5e28ef2a9261c1481f1d /exporters-influxdb | |
parent | b30ec0ec9fad72e07731e2e4745e694e2f416d15 (diff) | |
download | ouroboros-metrics-4afcd8732cbb33da63f665e7f3e0e4e9fe524e1e.tar.gz ouroboros-metrics-4afcd8732cbb33da63f665e7f3e0e4e9fe524e1e.zip |
Adds support for the new DHT RIB statistics. Grafana dashboard shows
number of keys in the DHT, the number locally managed values that need
to be republished, and the number of non-local values.
Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Diffstat (limited to 'exporters-influxdb')
-rwxr-xr-x | exporters-influxdb/pyExporter/oexport.py | 123 |
1 files changed, 100 insertions, 23 deletions
diff --git a/exporters-influxdb/pyExporter/oexport.py b/exporters-influxdb/pyExporter/oexport.py index e0edca0..2f38e2e 100755 --- a/exporters-influxdb/pyExporter/oexport.py +++ b/exporters-influxdb/pyExporter/oexport.py @@ -1,4 +1,4 @@ -#! /usr/bin/python +#!./venv/bin/python """ Ouroboros InfluxDB metrics exporter @@ -266,6 +266,44 @@ class OuroborosRIBReader: return stats + @staticmethod + def _get_trailing_number(s: str) -> int: + m = re.search(r'\d+$', s) + return int(m.group()) if m else None + + + def get_dht_stats_for_ipcp(self, + ipcp_name: str) -> dict: + """ + Get statistics for the DHT directory of an IPCP + :param ipcp_name: name of the IPCP + :return: statistics in a dict + """ + + str_to_metric = { + ' Number of keys': 'keys', + ' Number of local values': 'local_values', + ' Number of non-local values': 'non_local_values' + } + + path = os.path.join(self._get_dir_for_ipcp(ipcp_name), 'dht/stats') + if not os.path.exists(path): + return {} + + ret = dict() + + with open(path) as f: + for line in f.readlines(): + split_line = line.split(':') + phrase = split_line[0] + if phrase not in str_to_metric.keys(): + continue + metric = str_to_metric[phrase] + value = self._get_trailing_number(split_line[1]) + ret[metric] = value + + return ret + def _get_flows_for_process(self, process_name: str) -> list[str]: path = self._get_dir_for_process(process_name) @@ -280,20 +318,15 @@ class OuroborosRIBReader: return [] - @staticmethod - def _get_trailing_number(s: str) -> int: - m = re.search(r'\d+$', s) - return int(m.group()) if m else None - def _get_flow_info_for_n_plus_1_flow(self, ipcp_name: str, fd: str) -> dict: str_to_metric = { - 'Flow established at': None, - 'Remote address': None, + #'Flow established at': None, + #'Remote address': None, 'Local endpoint ID': 'endpoint_id', - 'Remote endpoint ID': None, + #'Remote endpoint ID': None, 'Sent (packets)': 'sent_pkts_total', 'Sent (bytes)': 'sent_bytes_total', 'Send failed (packets)': 'send_failed_packets_total', @@ -304,7 +337,7 @@ class OuroborosRIBReader: 'Receive failed (bytes)': 'recv_failed_bytes_total', 'Sent flow updates (packets)': 'sent_flow_updates_total', 'Received flow updates (packets)': 'recv_flow_updates_total', - 'Congestion avoidance algorithm': None, + #'Congestion avoidance algorithm': None, 'Upstream congestion level': 'up_cong_lvl', 'Downstream congestion level': 'down_cong_lvl', 'Upstream packet counter': 'up_pkt_ctr', @@ -313,7 +346,7 @@ class OuroborosRIBReader: 'Packets in this window': 'cong_wnd_current_pkts', 'Bytes in this window': 'cong_wnd_current_bytes', 'Max bytes in this window': 'cong_wnd_size_bytes', - 'Current congestion regime': None + #'Current congestion regime': None } ret = dict() @@ -327,12 +360,11 @@ class OuroborosRIBReader: for line in f.readlines(): split_line = line.split(':') phrase = split_line[0] - if phrase not in str_to_metric: + if phrase not in str_to_metric.keys(): continue metric = str_to_metric[phrase] - if metric is not None: - value = self._get_trailing_number(split_line[1]) - ret[metric] = value + value = self._get_trailing_number(split_line[1]) + ret[metric] = value return ret @@ -356,7 +388,10 @@ class OuroborosRIBReader: 'Receiver inactive (ns)': 'rcv_inact', 'Receiver last ack': 'rcv_seqno', 'Number of pkt retransmissions': 'n_rxm', + 'Number of rtt probes': 'n_prb', + 'Number of rtt estimates': 'n_rtt', 'Number of duplicates received': 'n_dup', + 'Number of delayed acks received': 'n_dak', 'Number of rendez-vous sent': 'n_rdv', 'Number of packets out of window': 'n_out', 'Number of packets out of rqueue': 'n_rqo' @@ -375,12 +410,11 @@ class OuroborosRIBReader: for line in f.readlines(): split_line = line.split(':') phrase = split_line[0] - if phrase not in str_to_metric: + if phrase not in str_to_metric.keys(): continue metric = str_to_metric[phrase] - if metric is not None: - value = self._get_trailing_number(split_line[1]) - ret[metric] = value + value = self._get_trailing_number(split_line[1]) + ret[metric] = value return ret @@ -409,7 +443,7 @@ class OuroborosRIBReader: path = self._get_path_for_ipcp_flow_n_minus_1_info(ipcp_name, fd) str_to_qos_metric = { - 'Flow established at': None, + #'Flow established at': None, ' sent (packets)': 'sent_packets_total', ' sent (bytes)': 'sent_bytes_total', ' rcvd (packets)': 'recv_packets_total', @@ -445,10 +479,11 @@ class OuroborosRIBReader: ret[_current_cube] = dict() else: split_line = line.split(':') + if split_line[0] not in str_to_qos_metric.keys(): + continue metric = str_to_qos_metric[split_line[0]] - if metric is not None: - value = self._get_trailing_number(split_line[1]) - ret[_current_cube][metric] = value + value = self._get_trailing_number(split_line[1]) + ret[_current_cube][metric] = value return ret @@ -669,6 +704,31 @@ class OuroborosExporter: except ApiException as e: print(e, point) + def _write_ouroboros_dht_node_metric(self, + metric: str, + now: str, + ipcp_name: str, + layer: str, + value): + + point = { + 'measurement': 'ouroboros_dht_' + metric + '_total', + 'tags': { + 'ipcp': ipcp_name, + 'layer': layer + }, + 'fields': { + metric: value, + 'time': now + } + } + + try: + self.write_api(bucket=self.bucket, + record=Point.from_dict(point)) + except ApiException as e: + print(e, point) + def _write_ouroboros_data_transfer_metric(self, metric: str, now: str, @@ -759,6 +819,7 @@ class OuroborosExporter: layer: str = None) -> list[dict]: if ipcp_type not in IPCP_TYPES: + print("Unknown IPCP TYPE: %s" % ipcp_type) return [] if ipcp_type: @@ -830,6 +891,21 @@ class OuroborosExporter: ipcp['layer'], value) + def _expoert_ouroboros_dht_metrics(self): + + ipcps = self.ribreader.get_ipcp_list(names_only=True) + + now = datetime.now(utc) + + for ipcp in ipcps: + metrics = self.ribreader.get_dht_stats_for_ipcp(ipcp['name']) + for metric, value in metrics.items(): + self._write_ouroboros_dht_node_metric(metric, + str(now), + ipcp['name'], + ipcp['layer'], + value) + def _export_ouroboros_data_transfer_metrics(self): ipcps = self.ribreader.get_ipcp_list(names_only=True) @@ -880,6 +956,7 @@ class OuroborosExporter: self._export_ouroboros_flow_allocator_flows_total() self._export_ouroboros_fa_congestion_metrics() self._export_ouroboros_lsdb_metrics() + self._expoert_ouroboros_dht_metrics() self._export_ouroboros_data_transfer_metrics() self._export_ouroboros_process_frct_metrics() |