aboutsummaryrefslogtreecommitdiff
path: root/exporters-influxdb/pyExporter/oexport.py
diff options
context:
space:
mode:
authorDimitri Staessens <dimitri@ouroboros.rocks>2026-03-15 11:19:34 +0100
committerDimitri Staessens <dimitri@ouroboros.rocks>2026-03-15 11:23:26 +0100
commit2f537b0a43c5fb6a11aafad6eba1e8d92cbc2893 (patch)
tree70eb0e9da78d82116aec5cce3f928d2a77986a13 /exporters-influxdb/pyExporter/oexport.py
parent4afcd8732cbb33da63f665e7f3e0e4e9fe524e1e (diff)
downloadouroboros-metrics-0.23.0.tar.gz
ouroboros-metrics-0.23.0.zip
metric: Update to pyproject and version tags0.23.0
Updates the project to use a pyproject.toml file and use setuptools_scm for versioning. Simplifies directory structure. Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
Diffstat (limited to 'exporters-influxdb/pyExporter/oexport.py')
-rwxr-xr-xexporters-influxdb/pyExporter/oexport.py986
1 files changed, 0 insertions, 986 deletions
diff --git a/exporters-influxdb/pyExporter/oexport.py b/exporters-influxdb/pyExporter/oexport.py
deleted file mode 100755
index 2f38e2e..0000000
--- a/exporters-influxdb/pyExporter/oexport.py
+++ /dev/null
@@ -1,986 +0,0 @@
-#!./venv/bin/python
-"""
-Ouroboros InfluxDB metrics exporter
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions
-are met:
-
-1. Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following
-disclaimer in the documentation and/or other materials provided
-with the distribution.
-
-3. Neither the name of the copyright holder nor the names of its
-contributors may be used to endorse or promote products derived
-from this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
-FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
-COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
-INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
-(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
-HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
-OF THE POSSIBILITY OF SUCH DAMAGE.
-"""
-
-import os
-import re
-import socket
-import time
-import argparse
-from datetime import datetime
-from typing import Optional
-
-from influxdb_client import InfluxDBClient, Point
-from influxdb_client.client.write_api import WriteOptions, PointSettings
-from influxdb_client.rest import ApiException
-from pytz import utc
-
-IPCP_TYPE_UNICAST = 'unicast'
-IPCP_TYPE_BROADCAST = 'broadcast'
-IPCP_TYPE_UDP = 'udp'
-IPCP_TYPE_ETH_DIX = 'eth-dix'
-IPCP_TYPE_ETH_LLC = 'eth-llc'
-IPCP_TYPE_LOCAL = 'local'
-IPCP_TYPE_UNKNOWN = 'unknown'
-
-IPCP_TYPES = [IPCP_TYPE_UNICAST,
- IPCP_TYPE_BROADCAST,
- IPCP_TYPE_UDP,
- IPCP_TYPE_ETH_DIX,
- IPCP_TYPE_ETH_LLC,
- IPCP_TYPE_LOCAL,
- IPCP_TYPE_UNKNOWN]
-
-IPCP_STATE_NULL = 'null'
-IPCP_STATE_INIT = 'init'
-IPCP_STATE_OPERATIONAL = 'operational'
-IPCP_STATE_SHUTDOWN = 'shutdown'
-
-IPCP_STATES = [IPCP_STATE_NULL,
- IPCP_STATE_INIT,
- IPCP_STATE_OPERATIONAL,
- IPCP_STATE_SHUTDOWN]
-
-
-class OuroborosRIBReader:
- """
- Class for reading stuff from the Ouroboros system
- Resource Information Base (RIB)
- """
- def __init__(self,
- rib_path: str):
-
- self.rib_path = rib_path
-
- def _get_dir_for_ipcp(self,
- ipcp_name: str) -> str:
-
- return os.path.join(self.rib_path, ipcp_name)
-
- def _get_dir_for_process(self,
- process_name: str) -> str:
-
- return os.path.join(self.rib_path, process_name)
-
- def _get_dt_dir_for_ipcp(self,
- ipcp_name: str) -> Optional[str]:
-
- path = self._get_dir_for_ipcp(ipcp_name)
- try:
- _subdirs = [f.name for f in os.scandir(path)]
- except IOError as _:
- return None
-
- for _dir in _subdirs:
- if len(_dir) > 3 and _dir[:3] == 'dt.':
- return os.path.join(path, _dir)
-
- return None
-
- def _get_path_for_ipcp_flow_n_plus_1_info(self,
- ipcp_name: str,
- fd: str):
-
- return os.path.join(self.rib_path, ipcp_name, 'flow-allocator', fd)
-
- def _get_path_for_ipcp_flow_n_minus_1_info(self,
- ipcp_name: str,
- fd: str) -> str:
-
- dt_dir = self._get_dt_dir_for_ipcp(ipcp_name)
- return os.path.join(dt_dir, fd)
-
- def _get_path_for_frct_flow_info(self,
- process: str,
- fd: str) -> str:
-
- process_dir = self._get_dir_for_process(process)
- return os.path.join(process_dir, str(fd), 'frct')
-
- def _get_ipcp_type_for_ipcp(self,
- ipcp_name: str) -> str:
-
- _dir = self._get_dir_for_ipcp(ipcp_name)
- path = '{}/info/_type'.format(_dir)
- if not os.path.exists(path):
- return IPCP_TYPE_UNKNOWN
-
- try:
- with open(path) as f:
- return f.readline()[:-1]
- except IOError as _:
- return IPCP_TYPE_UNKNOWN
-
- def _get_layer_name_for_ipcp(self,
- ipcp_name: str) -> str:
-
- _dir = self._get_dir_for_ipcp(ipcp_name)
- path = '{}/info/_layer'.format(_dir)
- if not os.path.exists(path):
- return '(error)'
-
- try:
- with open(path) as f:
- return f.readline()[:-1]
- except IOError as _:
- return '(error)'
-
- def _get_ipcp_state_for_ipcp(self,
- ipcp_name: str) -> str:
-
- _dir = self._get_dir_for_ipcp(ipcp_name)
- path = '{}/info/_state'.format(_dir)
- if not os.path.exists(path):
- return IPCP_TYPE_UNKNOWN
-
- try:
- with open(path) as f:
- return f.readline()[:-1]
- except IOError as e:
- print(e)
- return IPCP_TYPE_UNKNOWN
-
- def _get_n_plus_1_flows_for_ipcp(self,
- ipcp_name: str) -> list[str]:
-
- path = os.path.join(self._get_dir_for_ipcp(ipcp_name), 'flow-allocator')
-
- if not os.path.exists(path):
- return []
-
- try:
- return [f.name for f in os.scandir(path)]
- except IOError as e:
- print(e)
-
- return []
-
- def _get_n_minus_1_flows_for_ipcp(self,
- ipcp_name: str) -> list[str]:
-
- path = self._get_dt_dir_for_ipcp(ipcp_name)
- if path is None:
- return []
-
- if not os.path.exists(path):
- return []
-
- try:
- return [f.name for f in os.scandir(path)]
- except IOError as e:
- print(e)
- return []
-
- def _get_address_for_ipcp(self,
- ipcp_name):
-
- path = self._get_dir_for_ipcp(ipcp_name)
- try:
- _subdirs = [f.name for f in os.scandir(path)]
- except IOError as _:
- return None
-
- for _dir in _subdirs:
- if len(_dir) > 3 and _dir[:3] == 'dt.':
- return _dir[3:]
-
- return None
-
- def get_lsdb_stats_for_ipcp(self,
- ipcp_name: str) -> dict:
- """
- Get statistics for the link state database of an IPCP
- :param ipcp_name: name of the IPCP
- :return: statistics in a dict
- """
-
- address = self._get_address_for_ipcp(ipcp_name)
- if address is None:
- return {}
-
- path = os.path.join(self._get_dir_for_ipcp(ipcp_name), 'lsdb/')
- if not os.path.exists(path):
- return {}
-
- nodes = []
- neighbors = 0
- links = 0
-
- lsdb_entries = []
- try:
- lsdb_entries = [f.path for f in os.scandir(path)]
- except IOError as _:
- pass
-
- for lsdb_entry in lsdb_entries:
- try:
- with open(lsdb_entry) as e:
- for line in e.readlines():
- if 'src' in line:
- src = line.split()[-1]
- if src not in nodes:
- nodes += [src]
- if src == address:
- neighbors += 1
- if 'dst' in line:
- dst = line.split()[-1]
- if dst not in nodes:
- nodes += [dst]
- links += 1
- except IOError as _:
- continue
-
- stats = {'neighbors': neighbors,
- 'nodes': len(nodes),
- 'links': links}
-
- return stats
-
- @staticmethod
- def _get_trailing_number(s: str) -> int:
- m = re.search(r'\d+$', s)
- return int(m.group()) if m else None
-
-
- def get_dht_stats_for_ipcp(self,
- ipcp_name: str) -> dict:
- """
- Get statistics for the DHT directory of an IPCP
- :param ipcp_name: name of the IPCP
- :return: statistics in a dict
- """
-
- str_to_metric = {
- ' Number of keys': 'keys',
- ' Number of local values': 'local_values',
- ' Number of non-local values': 'non_local_values'
- }
-
- path = os.path.join(self._get_dir_for_ipcp(ipcp_name), 'dht/stats')
- if not os.path.exists(path):
- return {}
-
- ret = dict()
-
- with open(path) as f:
- for line in f.readlines():
- split_line = line.split(':')
- phrase = split_line[0]
- if phrase not in str_to_metric.keys():
- continue
- metric = str_to_metric[phrase]
- value = self._get_trailing_number(split_line[1])
- ret[metric] = value
-
- return ret
-
- def _get_flows_for_process(self,
- process_name: str) -> list[str]:
- path = self._get_dir_for_process(process_name)
-
- if not os.path.exists(path):
- return []
-
- try:
- return [f.name for f in os.scandir(path) if f.is_dir()]
- except IOError as e:
- print(e)
-
- return []
-
- def _get_flow_info_for_n_plus_1_flow(self,
- ipcp_name: str,
- fd: str) -> dict:
-
- str_to_metric = {
- #'Flow established at': None,
- #'Remote address': None,
- 'Local endpoint ID': 'endpoint_id',
- #'Remote endpoint ID': None,
- 'Sent (packets)': 'sent_pkts_total',
- 'Sent (bytes)': 'sent_bytes_total',
- 'Send failed (packets)': 'send_failed_packets_total',
- 'Send failed (bytes)': 'send_failed_bytes_total',
- 'Received (packets)': 'recv_pkts_total',
- 'Received (bytes)': 'recv_bytes_total',
- 'Receive failed (packets)': 'recv_failed_pkts_total',
- 'Receive failed (bytes)': 'recv_failed_bytes_total',
- 'Sent flow updates (packets)': 'sent_flow_updates_total',
- 'Received flow updates (packets)': 'recv_flow_updates_total',
- #'Congestion avoidance algorithm': None,
- 'Upstream congestion level': 'up_cong_lvl',
- 'Downstream congestion level': 'down_cong_lvl',
- 'Upstream packet counter': 'up_pkt_ctr',
- 'Downstream packet counter': 'down_pkt_ctr',
- 'Congestion window size (ns)': 'cong_wnd_width_ns',
- 'Packets in this window': 'cong_wnd_current_pkts',
- 'Bytes in this window': 'cong_wnd_current_bytes',
- 'Max bytes in this window': 'cong_wnd_size_bytes',
- #'Current congestion regime': None
- }
-
- ret = dict()
-
- path = self._get_path_for_ipcp_flow_n_plus_1_info(ipcp_name, fd)
-
- if not os.path.exists(path):
- return dict()
-
- with open(path) as f:
- for line in f.readlines():
- split_line = line.split(':')
- phrase = split_line[0]
- if phrase not in str_to_metric.keys():
- continue
- metric = str_to_metric[phrase]
- value = self._get_trailing_number(split_line[1])
- ret[metric] = value
-
- return ret
-
- def _get_frct_info_for_process_flow(self,
- process: str,
- fd: str) -> dict:
-
- str_to_metric = {
- 'Maximum packet lifetime (ns)': 'mpl_timer_ns',
- 'Max time to Ack (ns)': 'a_timer_ns',
- 'Max time to Retransmit (ns)': 'r_timer_ns',
- 'Smoothed rtt (ns)': 'srtt_ns',
- 'RTT standard deviation (ns)': 'mdev_ns',
- 'Retransmit timeout RTO (ns)': 'rto_ns',
- 'Sender left window edge': 'snd_lwe',
- 'Sender right window edge': 'snd_rwe',
- 'Sender inactive (ns)': 'snd_inact',
- 'Sender current sequence number': 'snd_seqno',
- 'Receiver left window edge': 'rcv_lwe',
- 'Receiver right window edge': 'rcv_rwe',
- 'Receiver inactive (ns)': 'rcv_inact',
- 'Receiver last ack': 'rcv_seqno',
- 'Number of pkt retransmissions': 'n_rxm',
- 'Number of rtt probes': 'n_prb',
- 'Number of rtt estimates': 'n_rtt',
- 'Number of duplicates received': 'n_dup',
- 'Number of delayed acks received': 'n_dak',
- 'Number of rendez-vous sent': 'n_rdv',
- 'Number of packets out of window': 'n_out',
- 'Number of packets out of rqueue': 'n_rqo'
- }
-
- ret = dict()
-
- path = self._get_path_for_frct_flow_info(process, fd)
-
- if not os.path.exists(path):
- return dict()
-
- ret['fd'] = fd
-
- with open(path) as f:
- for line in f.readlines():
- split_line = line.split(':')
- phrase = split_line[0]
- if phrase not in str_to_metric.keys():
- continue
- metric = str_to_metric[phrase]
- value = self._get_trailing_number(split_line[1])
- ret[metric] = value
-
- return ret
-
- def get_flow_allocator_flow_info_for_ipcp(self,
- ipcp_name: str) -> list[dict]:
- """
- Get the flow intformation for all N+1 flows in a certain IPCP
- :param ipcp_name: name of the IPCP
- :return: dict with flow information
- """
- flow_info = []
-
- flow_descriptors = self._get_n_plus_1_flows_for_ipcp(ipcp_name)
- for flow in flow_descriptors:
- info = self._get_flow_info_for_n_plus_1_flow(ipcp_name, flow)
- flow_info += [info]
-
- return flow_info
-
- def _get_flow_info_for_n_minus_1_flow(self,
- ipcp_name: str,
- fd: str) -> dict:
-
- ret = dict()
-
- path = self._get_path_for_ipcp_flow_n_minus_1_info(ipcp_name, fd)
-
- str_to_qos_metric = {
- #'Flow established at': None,
- ' sent (packets)': 'sent_packets_total',
- ' sent (bytes)': 'sent_bytes_total',
- ' rcvd (packets)': 'recv_packets_total',
- ' rcvd (bytes)': 'recv_bytes_total',
- ' local sent (packets)': 'local_sent_packets_total',
- ' local sent (bytes)': 'local_sent_bytes_total',
- ' local rcvd (packets)': 'local_recv_packets_total',
- ' local rcvd (bytes)': 'local_recv_bytes_total',
- ' dropped ttl (packets)': 'ttl_packets_dropped_total',
- ' dropped ttl (bytes)': 'ttl_bytes_dropped_total',
- ' failed writes (packets)': 'write_packets_dropped_total',
- ' failed writes (bytes)': 'write_bytes_dropped_total',
- ' failed nhop (packets)': 'nhop_packets_dropped_total',
- ' failed nhop (bytes)': 'nhop_bytes_dropped_total'
- }
-
- if not os.path.exists(path):
- return dict()
-
- with open(path) as f:
- _current_cube = ''
- ret['fd'] = fd
- for line in [_line for _line in f.readlines() if _line != '\n']:
- if 'Endpoint address' in line:
- ret['endpoint'] = line.split(':')[-1].replace(' ', '')[:-1]
- elif 'Queued packets (rx)' in line:
- ret['queued_packets_rx'] = self._get_trailing_number(line)
- elif 'Queued packets (tx)' in line:
- ret['queued_packets_tx'] = self._get_trailing_number(line)
- elif 'Qos cube' in line:
- _cube = self._get_trailing_number(line[:-2])
- _current_cube = 'QoS cube ' + str(_cube)
- ret[_current_cube] = dict()
- else:
- split_line = line.split(':')
- if split_line[0] not in str_to_qos_metric.keys():
- continue
- metric = str_to_qos_metric[split_line[0]]
- value = self._get_trailing_number(split_line[1])
- ret[_current_cube][metric] = value
-
- return ret
-
- def get_data_transfer_flow_info_for_ipcp(self,
- ipcp_name: str) -> list[dict]:
- """
- Get the flow information for all Data Transfer (N-1) flows in a certain IPCP
- :param ipcp_name: name of the IPCP
- :return: flow information for the data transfer flows
- """
-
- flow_info = []
-
- flow_descriptors = self._get_n_minus_1_flows_for_ipcp(ipcp_name)
- for flow in flow_descriptors:
- info = self._get_flow_info_for_n_minus_1_flow(ipcp_name, flow)
- flow_info += [info]
-
- return flow_info
-
- def get_frct_info_for_process(self,
- process_name: str) -> list[dict]:
- """
- Get the frct information for all flows for a certain process
- :param process_name: name of the process
- :return: flow information for the N-1 flows
- """
-
- frct_info = []
-
- flow_descriptors = self._get_flows_for_process(process_name)
-
- for flow in flow_descriptors:
- info = self._get_frct_info_for_process_flow(process_name, flow)
- frct_info += [info]
-
- return frct_info
-
- # pylint: disable-msg=too-many-arguments
- def get_ipcp_list(self,
- names_only: bool = False, # return name and layer name
- types: bool = True,
- states: bool = True,
- layers: bool = True,
- flows: bool = True) -> list[dict]:
- """
- Get a list of all IPCPs
- :param names_only: only return IPCP names and layer names
- :param types: return IPCP type
- :param states: return IPCP state
- :param layers: return layer in which the IPCP is enrolled
- :param flows: return the number of allocated (N+1) flows for this IPCP
- :return: list of dicts containing IPCP info
- """
-
- ipcp_list = []
-
- if not os.path.exists(self.rib_path):
- return []
-
- for ipcp_dir in [f.path for f in os.scandir(self.rib_path)
- if f.is_dir() and not f.name.startswith('proc.')]:
- ipcp_name = os.path.split(ipcp_dir)[-1]
- ipcp_type = None
- ipcp_state = None
- ipcp_layer = self._get_layer_name_for_ipcp(ipcp_name) if layers else None
- ipcp_flows = None
- if not names_only:
- ipcp_type = self._get_ipcp_type_for_ipcp(ipcp_name) if types else None
- ipcp_state = self._get_ipcp_state_for_ipcp(ipcp_name) if states else None
- ipcp_flows = self._get_n_plus_1_flows_for_ipcp(ipcp_name) if flows else None
-
- ipcp_list += [{'name': ipcp_name,
- 'type': ipcp_type,
- 'state': ipcp_state,
- 'layer': ipcp_layer,
- 'flows': len(ipcp_flows) if ipcp_flows else None}]
- return ipcp_list
- # pylint: enable-msg=too-many-arguments
-
- def get_process_list(self) -> list[str]:
- """
- Get a list of all the Ouroboros applications that may be exposing frct stats
- :return: list of process names ("proc.<pid>")
- """
- proc_list = []
-
- if not os.path.exists(self.rib_path):
- return []
-
- for proc in [f.name for f in os.scandir(self.rib_path)
- if f.is_dir() and f.name.startswith('proc.')]:
- proc_list += [proc]
-
- return proc_list
-
-
-class OuroborosExporter:
- """
- Export Ouroboros metrics to InfluxDB
- """
-
- def __init__(self,
- bucket='ouroboros-metrics',
- config='./config.ini',
- rib_path='/tmp/ouroboros/'):
-
- point_settings = PointSettings()
- point_settings.add_default_tag('system', socket.gethostname())
-
- write_options = WriteOptions(batch_size=500,
- flush_interval=10_000,
- jitter_interval=1_000,
- retry_interval=1_000,
- max_retries=5,
- max_retry_delay=30_000,
- exponential_base=2)
-
- self.bucket = bucket
- self.client = InfluxDBClient.from_config_file(config)
- self.write_api = self.client.write_api(write_options=write_options,
- point_settings=point_settings).write
- self.query_api = self.client.query_api()
- self.ribreader = OuroborosRIBReader(rib_path=rib_path)
-
- def __exit__(self, _type, value, traceback):
- self.client.close()
-
- def _write_ouroboros_ipcps_total(self,
- now,
- ipcp_type,
- n_ipcps):
-
- point = {
- 'measurement': 'ouroboros_{}_ipcps_total'.format(ipcp_type),
- 'tags': {
- 'type': ipcp_type,
- },
- 'fields': {
- 'ipcps': n_ipcps,
- 'time': str(now)
- }
- }
-
- self.write_api(bucket=self.bucket,
- record=Point.from_dict(point))
-
- def _write_ouroboros_flow_allocator_flows_total(self,
- now,
- ipcp,
- layer,
- n_flows):
- point = {
- 'measurement': 'ouroboros_flow_allocator_flows_total',
- 'tags': {
- 'ipcp': ipcp,
- 'layer': layer
- },
- 'fields': {
- 'flows': n_flows,
- 'time': str(now)
- }
- }
-
- self.write_api(bucket=self.bucket,
- record=Point.from_dict(point))
-
- # pylint: disable-msg=too-many-arguments
- def _write_ouroboros_fa_congestion_metric(self,
- metric: str,
- now: str,
- ipcp_name: str,
- eid: str,
- layer,
- value):
-
- point = {
- 'measurement': 'ouroboros_flow_allocator_' + metric,
- 'tags': {
- 'ipcp': ipcp_name,
- 'layer': layer,
- 'flow_id': eid
- },
- 'fields': {
- metric: value,
- 'time': now
- }
- }
-
- try:
- self.write_api(bucket=self.bucket,
- record=Point.from_dict(point))
- except ApiException as e:
- print(e, point)
-
- def _write_ouroboros_lsdb_node_metric(self,
- metric: str,
- now: str,
- ipcp_name: str,
- layer: str,
- value):
-
- point = {
- 'measurement': 'ouroboros_lsdb_' + metric + '_total',
- 'tags': {
- 'ipcp': ipcp_name,
- 'layer': layer
- },
- 'fields': {
- metric: value,
- 'time': now
- }
- }
-
- try:
- self.write_api(bucket=self.bucket,
- record=Point.from_dict(point))
- except ApiException as e:
- print(e, point)
-
- def _write_ouroboros_dht_node_metric(self,
- metric: str,
- now: str,
- ipcp_name: str,
- layer: str,
- value):
-
- point = {
- 'measurement': 'ouroboros_dht_' + metric + '_total',
- 'tags': {
- 'ipcp': ipcp_name,
- 'layer': layer
- },
- 'fields': {
- metric: value,
- 'time': now
- }
- }
-
- try:
- self.write_api(bucket=self.bucket,
- record=Point.from_dict(point))
- except ApiException as e:
- print(e, point)
-
- def _write_ouroboros_data_transfer_metric(self,
- metric: str,
- now: str,
- qos_cube: str,
- fd: str,
- endpoint: str,
- ipcp_name: str,
- layer,
- value):
-
- point = {
- 'measurement': 'ouroboros_data_transfer_' + metric,
- 'tags': {
- 'ipcp': ipcp_name,
- 'layer': layer,
- 'flow_descriptor': fd,
- 'qos_cube': qos_cube,
- 'endpoint': endpoint
- },
- 'fields': {
- metric: value,
- 'time': now
- }
- }
-
- try:
- self.write_api(bucket=self.bucket,
- record=Point.from_dict(point))
- except ApiException as e:
- print(e, point)
-
- def _write_ouroboros_data_transfer_queued(self,
- now,
- fd,
- ipcp_name,
- layer,
- metrics) -> None:
- point = dict()
- for metric in metrics:
- point = {
- 'measurement': 'ouroboros_data_transfer_' + metric,
- 'tags': {
- 'ipcp': ipcp_name,
- 'layer': layer,
- 'flow_descriptor': fd,
- },
- 'fields': {
- metric: metrics[metric],
- 'time': now
- }
- }
-
- try:
- self.write_api(bucket=self.bucket,
- record=Point.from_dict(point))
- except ApiException as e:
- print(e, point)
-
- def _write_ouroboros_process_frct_metric(self,
- now,
- metric,
- fd,
- process,
- value):
- point = {
- 'measurement': 'ouroboros_process_frct_' + metric,
- 'tags': {
- 'process': process,
- 'flow_descriptor': fd,
- },
- 'fields': {
- metric: value,
- 'time': now
- }
- }
-
- try:
- self.write_api(bucket=self.bucket,
- record=Point.from_dict(point))
- except ApiException as e:
- print(e, point)
- # pylint: enable-msg=too-many-arguments
-
- @staticmethod
- def _filter_ipcp_list(ipcp_list: list[dict],
- ipcp_type: str = None,
- ipcp_state: str = None,
- layer: str = None) -> list[dict]:
-
- if ipcp_type not in IPCP_TYPES:
- print("Unknown IPCP TYPE: %s" % ipcp_type)
- return []
-
- if ipcp_type:
- ipcp_list = [ipcp for ipcp in ipcp_list if ipcp['type'] == ipcp_type]
-
- if ipcp_state:
- ipcp_list = [ipcp for ipcp in ipcp_list if ipcp['state'] == ipcp_state]
-
- if layer:
- ipcp_list = [ipcp for ipcp in ipcp_list if ipcp['layer'] == layer]
-
- return ipcp_list
-
- def _export_ouroboros_ipcps_total(self):
-
- ipcps = self.ribreader.get_ipcp_list()
-
- ipcps_total = dict()
-
- for _type in IPCP_TYPES:
- ipcps_total[_type] = len(self._filter_ipcp_list(ipcps, ipcp_type=_type))
-
- now = datetime.now(utc)
-
- for _type, n_ipcps in ipcps_total.items():
- self._write_ouroboros_ipcps_total(now, _type, n_ipcps)
-
- def _export_ouroboros_flow_allocator_flows_total(self):
-
- ipcps = self.ribreader.get_ipcp_list()
-
- now = datetime.now(utc)
-
- for ipcp in [ipcp for ipcp in ipcps if ipcp['flows'] is not None]:
- self._write_ouroboros_flow_allocator_flows_total(now, ipcp['name'], ipcp['layer'], ipcp['flows'])
-
- def _export_ouroboros_fa_congestion_metrics(self):
-
- ipcps = self.ribreader.get_ipcp_list(names_only=True)
-
- now = datetime.now(utc)
-
- for ipcp in ipcps:
- flows = self.ribreader.get_flow_allocator_flow_info_for_ipcp(ipcp['name'])
- for flow in flows:
- for metric in flow:
- if metric == 'endpoint_id':
- continue
-
- self._write_ouroboros_fa_congestion_metric(metric,
- str(now),
- ipcp['name'],
- flow['endpoint_id'],
- ipcp['layer'],
- flow[metric])
-
- def _export_ouroboros_lsdb_metrics(self):
-
- ipcps = self.ribreader.get_ipcp_list(names_only=True)
-
- now = datetime.now(utc)
-
- for ipcp in ipcps:
- metrics = self.ribreader.get_lsdb_stats_for_ipcp(ipcp['name'])
- for metric, value in metrics.items():
- self._write_ouroboros_lsdb_node_metric(metric,
- str(now),
- ipcp['name'],
- ipcp['layer'],
- value)
-
- def _expoert_ouroboros_dht_metrics(self):
-
- ipcps = self.ribreader.get_ipcp_list(names_only=True)
-
- now = datetime.now(utc)
-
- for ipcp in ipcps:
- metrics = self.ribreader.get_dht_stats_for_ipcp(ipcp['name'])
- for metric, value in metrics.items():
- self._write_ouroboros_dht_node_metric(metric,
- str(now),
- ipcp['name'],
- ipcp['layer'],
- value)
-
- def _export_ouroboros_data_transfer_metrics(self):
- ipcps = self.ribreader.get_ipcp_list(names_only=True)
-
- now = datetime.now(utc)
-
- for ipcp in ipcps:
- info = self.ribreader.get_data_transfer_flow_info_for_ipcp(ipcp['name'])
- for flow in info:
- qoscubes = [_field for _field in flow if str(_field).startswith('QoS cube')]
- for qoscube in qoscubes:
- for metric in flow[qoscube]:
- self._write_ouroboros_data_transfer_metric(metric,
- str(now),
- qoscube,
- flow['fd'],
- flow['endpoint'],
- ipcp['name'],
- ipcp['layer'],
- flow[qoscube][metric])
- self._write_ouroboros_data_transfer_queued(str(now),
- flow['fd'],
- ipcp['name'],
- ipcp['layer'],
- {'queued_packets_rx': flow['queued_packets_rx'],
- 'queued_packets_tx': flow['queued_packets_tx']})
-
- def _export_ouroboros_process_frct_metrics(self):
- processes = self.ribreader.get_process_list()
-
- now = datetime.now(utc)
-
- for process in processes:
- for frct_info in self.ribreader.get_frct_info_for_process(process):
- for metric in frct_info:
- self._write_ouroboros_process_frct_metric(str(now),
- metric,
- frct_info['fd'],
- process,
- frct_info[metric])
-
- def export(self):
- """
- Export all available metrics
- :return:
- """
-
- self._export_ouroboros_ipcps_total()
- self._export_ouroboros_flow_allocator_flows_total()
- self._export_ouroboros_fa_congestion_metrics()
- self._export_ouroboros_lsdb_metrics()
- self._expoert_ouroboros_dht_metrics()
- self._export_ouroboros_data_transfer_metrics()
- self._export_ouroboros_process_frct_metrics()
-
- def run(self,
- interval_ms: float = 1000):
- """
- Run the ouroboros exporter
-
- :param interval_ms: read interval in milliseconds
- :return: None
- """
-
- while True:
- time.sleep(interval_ms / 1000.0)
- self.export()
-
-
-if __name__ == '__main__':
- argparser = argparse.ArgumentParser(description="Ouroboros InfluxDB metrics exporter")
- argparser.add_argument('-i', '--interval', type=int, default='1000',
- help="Interval at which to collect metrics (milliseconds)")
- argparser.add_argument('-b', '--bucket', type=str, default='ouroboros-metrics',
- help="InfluxDB bucket to write to")
- args = argparser.parse_args()
- interval_ms = args.interval
- exporter = OuroborosExporter(bucket=args.bucket)
- exporter.run(interval_ms=interval_ms)