aboutsummaryrefslogtreecommitdiff
path: root/exporters-influxdb/pyExporter
diff options
context:
space:
mode:
Diffstat (limited to 'exporters-influxdb/pyExporter')
-rwxr-xr-xexporters-influxdb/pyExporter/oexport.py123
1 files changed, 100 insertions, 23 deletions
diff --git a/exporters-influxdb/pyExporter/oexport.py b/exporters-influxdb/pyExporter/oexport.py
index e0edca0..2f38e2e 100755
--- a/exporters-influxdb/pyExporter/oexport.py
+++ b/exporters-influxdb/pyExporter/oexport.py
@@ -1,4 +1,4 @@
-#! /usr/bin/python
+#!./venv/bin/python
"""
Ouroboros InfluxDB metrics exporter
@@ -266,6 +266,44 @@ class OuroborosRIBReader:
return stats
+ @staticmethod
+ def _get_trailing_number(s: str) -> int:
+ m = re.search(r'\d+$', s)
+ return int(m.group()) if m else None
+
+
+ def get_dht_stats_for_ipcp(self,
+ ipcp_name: str) -> dict:
+ """
+ Get statistics for the DHT directory of an IPCP
+ :param ipcp_name: name of the IPCP
+ :return: statistics in a dict
+ """
+
+ str_to_metric = {
+ ' Number of keys': 'keys',
+ ' Number of local values': 'local_values',
+ ' Number of non-local values': 'non_local_values'
+ }
+
+ path = os.path.join(self._get_dir_for_ipcp(ipcp_name), 'dht/stats')
+ if not os.path.exists(path):
+ return {}
+
+ ret = dict()
+
+ with open(path) as f:
+ for line in f.readlines():
+ split_line = line.split(':')
+ phrase = split_line[0]
+ if phrase not in str_to_metric.keys():
+ continue
+ metric = str_to_metric[phrase]
+ value = self._get_trailing_number(split_line[1])
+ ret[metric] = value
+
+ return ret
+
def _get_flows_for_process(self,
process_name: str) -> list[str]:
path = self._get_dir_for_process(process_name)
@@ -280,20 +318,15 @@ class OuroborosRIBReader:
return []
- @staticmethod
- def _get_trailing_number(s: str) -> int:
- m = re.search(r'\d+$', s)
- return int(m.group()) if m else None
-
def _get_flow_info_for_n_plus_1_flow(self,
ipcp_name: str,
fd: str) -> dict:
str_to_metric = {
- 'Flow established at': None,
- 'Remote address': None,
+ #'Flow established at': None,
+ #'Remote address': None,
'Local endpoint ID': 'endpoint_id',
- 'Remote endpoint ID': None,
+ #'Remote endpoint ID': None,
'Sent (packets)': 'sent_pkts_total',
'Sent (bytes)': 'sent_bytes_total',
'Send failed (packets)': 'send_failed_packets_total',
@@ -304,7 +337,7 @@ class OuroborosRIBReader:
'Receive failed (bytes)': 'recv_failed_bytes_total',
'Sent flow updates (packets)': 'sent_flow_updates_total',
'Received flow updates (packets)': 'recv_flow_updates_total',
- 'Congestion avoidance algorithm': None,
+ #'Congestion avoidance algorithm': None,
'Upstream congestion level': 'up_cong_lvl',
'Downstream congestion level': 'down_cong_lvl',
'Upstream packet counter': 'up_pkt_ctr',
@@ -313,7 +346,7 @@ class OuroborosRIBReader:
'Packets in this window': 'cong_wnd_current_pkts',
'Bytes in this window': 'cong_wnd_current_bytes',
'Max bytes in this window': 'cong_wnd_size_bytes',
- 'Current congestion regime': None
+ #'Current congestion regime': None
}
ret = dict()
@@ -327,12 +360,11 @@ class OuroborosRIBReader:
for line in f.readlines():
split_line = line.split(':')
phrase = split_line[0]
- if phrase not in str_to_metric:
+ if phrase not in str_to_metric.keys():
continue
metric = str_to_metric[phrase]
- if metric is not None:
- value = self._get_trailing_number(split_line[1])
- ret[metric] = value
+ value = self._get_trailing_number(split_line[1])
+ ret[metric] = value
return ret
@@ -356,7 +388,10 @@ class OuroborosRIBReader:
'Receiver inactive (ns)': 'rcv_inact',
'Receiver last ack': 'rcv_seqno',
'Number of pkt retransmissions': 'n_rxm',
+ 'Number of rtt probes': 'n_prb',
+ 'Number of rtt estimates': 'n_rtt',
'Number of duplicates received': 'n_dup',
+ 'Number of delayed acks received': 'n_dak',
'Number of rendez-vous sent': 'n_rdv',
'Number of packets out of window': 'n_out',
'Number of packets out of rqueue': 'n_rqo'
@@ -375,12 +410,11 @@ class OuroborosRIBReader:
for line in f.readlines():
split_line = line.split(':')
phrase = split_line[0]
- if phrase not in str_to_metric:
+ if phrase not in str_to_metric.keys():
continue
metric = str_to_metric[phrase]
- if metric is not None:
- value = self._get_trailing_number(split_line[1])
- ret[metric] = value
+ value = self._get_trailing_number(split_line[1])
+ ret[metric] = value
return ret
@@ -409,7 +443,7 @@ class OuroborosRIBReader:
path = self._get_path_for_ipcp_flow_n_minus_1_info(ipcp_name, fd)
str_to_qos_metric = {
- 'Flow established at': None,
+ #'Flow established at': None,
' sent (packets)': 'sent_packets_total',
' sent (bytes)': 'sent_bytes_total',
' rcvd (packets)': 'recv_packets_total',
@@ -445,10 +479,11 @@ class OuroborosRIBReader:
ret[_current_cube] = dict()
else:
split_line = line.split(':')
+ if split_line[0] not in str_to_qos_metric.keys():
+ continue
metric = str_to_qos_metric[split_line[0]]
- if metric is not None:
- value = self._get_trailing_number(split_line[1])
- ret[_current_cube][metric] = value
+ value = self._get_trailing_number(split_line[1])
+ ret[_current_cube][metric] = value
return ret
@@ -669,6 +704,31 @@ class OuroborosExporter:
except ApiException as e:
print(e, point)
+ def _write_ouroboros_dht_node_metric(self,
+ metric: str,
+ now: str,
+ ipcp_name: str,
+ layer: str,
+ value):
+
+ point = {
+ 'measurement': 'ouroboros_dht_' + metric + '_total',
+ 'tags': {
+ 'ipcp': ipcp_name,
+ 'layer': layer
+ },
+ 'fields': {
+ metric: value,
+ 'time': now
+ }
+ }
+
+ try:
+ self.write_api(bucket=self.bucket,
+ record=Point.from_dict(point))
+ except ApiException as e:
+ print(e, point)
+
def _write_ouroboros_data_transfer_metric(self,
metric: str,
now: str,
@@ -759,6 +819,7 @@ class OuroborosExporter:
layer: str = None) -> list[dict]:
if ipcp_type not in IPCP_TYPES:
+ print("Unknown IPCP TYPE: %s" % ipcp_type)
return []
if ipcp_type:
@@ -830,6 +891,21 @@ class OuroborosExporter:
ipcp['layer'],
value)
+ def _expoert_ouroboros_dht_metrics(self):
+
+ ipcps = self.ribreader.get_ipcp_list(names_only=True)
+
+ now = datetime.now(utc)
+
+ for ipcp in ipcps:
+ metrics = self.ribreader.get_dht_stats_for_ipcp(ipcp['name'])
+ for metric, value in metrics.items():
+ self._write_ouroboros_dht_node_metric(metric,
+ str(now),
+ ipcp['name'],
+ ipcp['layer'],
+ value)
+
def _export_ouroboros_data_transfer_metrics(self):
ipcps = self.ribreader.get_ipcp_list(names_only=True)
@@ -880,6 +956,7 @@ class OuroborosExporter:
self._export_ouroboros_flow_allocator_flows_total()
self._export_ouroboros_fa_congestion_metrics()
self._export_ouroboros_lsdb_metrics()
+ self._expoert_ouroboros_dht_metrics()
self._export_ouroboros_data_transfer_metrics()
self._export_ouroboros_process_frct_metrics()