diff options
author | Dimitri Staessens <dimitri@ouroboros.rocks> | 2021-07-03 13:12:35 +0200 |
---|---|---|
committer | Dimitri Staessens <dimitri@ouroboros.rocks> | 2021-07-03 13:12:35 +0200 |
commit | 24d9921de51f29dbef1ccabe330bb03774add9c3 (patch) | |
tree | a61a3d14f07d3c6056024f58486f37d3caa6ec70 /exporters-influxdb | |
parent | 45bb5bd91489f9c4cd604aa80e06a64239e5ea0d (diff) | |
download | ouroboros-metrics-24d9921de51f29dbef1ccabe330bb03774add9c3.tar.gz ouroboros-metrics-24d9921de51f29dbef1ccabe330bb03774add9c3.zip |
exporters: Add InfluxDB exporter
Adds a simple exporter for InfluxDB written in Python.
Diffstat (limited to 'exporters-influxdb')
-rw-r--r-- | exporters-influxdb/pyExporter/config.ini | 6 | ||||
-rw-r--r-- | exporters-influxdb/pyExporter/config.ini.example | 6 | ||||
-rw-r--r-- | exporters-influxdb/pyExporter/oexport.py | 884 |
3 files changed, 896 insertions, 0 deletions
diff --git a/exporters-influxdb/pyExporter/config.ini b/exporters-influxdb/pyExporter/config.ini new file mode 100644 index 0000000..a3675fe --- /dev/null +++ b/exporters-influxdb/pyExporter/config.ini @@ -0,0 +1,6 @@ +[influx2] +url=http://localhost:8086 +org=Ouroboros +token=bqQjzwAOxyig4hgmoR0d8Z0602vPojRt7Ne3VgQeXLugOtn_SvUdfcqxf9A2s2M3Czc77LDKPQpesrZkxhNozg== +timeout=6000 +verify_ssl=False
\ No newline at end of file diff --git a/exporters-influxdb/pyExporter/config.ini.example b/exporters-influxdb/pyExporter/config.ini.example new file mode 100644 index 0000000..f2656f7 --- /dev/null +++ b/exporters-influxdb/pyExporter/config.ini.example @@ -0,0 +1,6 @@ +[influx2] +url=http://localhost:8086 +org=<your-org> +token=<your-token> +timeout=6000 +verify_ssl=False
\ No newline at end of file diff --git a/exporters-influxdb/pyExporter/oexport.py b/exporters-influxdb/pyExporter/oexport.py new file mode 100644 index 0000000..03d1ba3 --- /dev/null +++ b/exporters-influxdb/pyExporter/oexport.py @@ -0,0 +1,884 @@ +""" +Ouroboros InfluxDB metrics exporter + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: + +1. Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following +disclaimer in the documentation and/or other materials provided +with the distribution. + +3. Neither the name of the copyright holder nor the names of its +contributors may be used to endorse or promote products derived +from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +OF THE POSSIBILITY OF SUCH DAMAGE. +""" + +import os +import re +import socket +import time +from datetime import datetime +from typing import Optional + +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.write_api import WriteOptions, PointSettings +from influxdb_client.rest import ApiException +from pytz import utc + +IPCP_TYPE_UNICAST = 'unicast' +IPCP_TYPE_BROADCAST = 'broadcast' +IPCP_TYPE_UDP = 'udp' +IPCP_TYPE_ETH_DIX = 'eth-dix' +IPCP_TYPE_ETH_LLC = 'eth-llc' +IPCP_TYPE_LOCAL = 'local' +IPCP_TYPE_UNKNOWN = 'unknown' + +IPCP_TYPES = [IPCP_TYPE_UNICAST, + IPCP_TYPE_BROADCAST, + IPCP_TYPE_UDP, + IPCP_TYPE_ETH_DIX, + IPCP_TYPE_ETH_LLC, + IPCP_TYPE_LOCAL, + IPCP_TYPE_UNKNOWN] + +IPCP_STATE_NULL = 'null' +IPCP_STATE_INIT = 'init' +IPCP_STATE_OPERATIONAL = 'operational' +IPCP_STATE_SHUTDOWN = 'shutdown' + +IPCP_STATES = [IPCP_STATE_NULL, + IPCP_STATE_INIT, + IPCP_STATE_OPERATIONAL, + IPCP_STATE_SHUTDOWN] + + +class OuroborosRIBReader: + """ + Class for reading stuff from the Ouroboros system + Resource Information Base (RIB) + """ + def __init__(self, + rib_path: str): + + self.rib_path = rib_path + + def _get_dir_for_ipcp(self, + ipcp_name: str) -> str: + + return os.path.join(self.rib_path, ipcp_name) + + def _get_dir_for_process(self, + process_name: str) -> str: + + return os.path.join(self.rib_path, process_name) + + def _get_dt_dir_for_ipcp(self, + ipcp_name: str) -> Optional[str]: + + path = self._get_dir_for_ipcp(ipcp_name) + try: + _subdirs = [f.name for f in os.scandir(path)] + except IOError as _: + return None + + for _dir in _subdirs: + if len(_dir) > 3 and _dir[:3] == 'dt.': + return os.path.join(path, _dir) + + return None + + def _get_path_for_ipcp_flow_n_plus_1_info(self, + ipcp_name: str, + fd: str): + + return os.path.join(self.rib_path, ipcp_name, 'flow-allocator', fd) + + def _get_path_for_ipcp_flow_n_minus_1_info(self, + ipcp_name: str, + fd: str) -> str: + + dt_dir = self._get_dt_dir_for_ipcp(ipcp_name) + return os.path.join(dt_dir, fd) + + def _get_path_for_frct_flow_info(self, + process: str, + fd: str) -> str: + + process_dir = self._get_dir_for_process(process) + return os.path.join(process_dir, str(fd), 'frct') + + def _get_ipcp_type_for_ipcp(self, + ipcp_name: str) -> str: + + _dir = self._get_dir_for_ipcp(ipcp_name) + path = '{}/info/_type'.format(_dir) + if not os.path.exists(path): + return IPCP_TYPE_UNKNOWN + + try: + with open(path) as f: + return f.readline()[:-1] + except IOError as _: + return IPCP_TYPE_UNKNOWN + + def _get_layer_name_for_ipcp(self, + ipcp_name: str) -> str: + + _dir = self._get_dir_for_ipcp(ipcp_name) + path = '{}/info/_layer'.format(_dir) + if not os.path.exists(path): + return '(error)' + + try: + with open(path) as f: + return f.readline()[:-1] + except IOError as _: + return '(error)' + + def _get_ipcp_state_for_ipcp(self, + ipcp_name: str) -> str: + + _dir = self._get_dir_for_ipcp(ipcp_name) + path = '{}/info/_state'.format(_dir) + if not os.path.exists(path): + return IPCP_TYPE_UNKNOWN + + try: + with open(path) as f: + return f.readline()[:-1] + except IOError as e: + print(e) + return IPCP_TYPE_UNKNOWN + + def _get_n_plus_1_flows_for_ipcp(self, + ipcp_name: str) -> list[str]: + + path = os.path.join(self._get_dir_for_ipcp(ipcp_name), 'flow-allocator') + + if not os.path.exists(path): + return [] + + try: + return [f.name for f in os.scandir(path)] + except IOError as e: + print(e) + + return [] + + def _get_n_minus_1_flows_for_ipcp(self, + ipcp_name: str) -> list[str]: + + path = self._get_dt_dir_for_ipcp(ipcp_name) + if path is None: + return [] + + if not os.path.exists(path): + return [] + + try: + return [f.name for f in os.scandir(path)] + except IOError as e: + print(e) + return [] + + def _get_address_for_ipcp(self, + ipcp_name): + + _dir = self._get_dt_dir_for_ipcp(ipcp_name) + if _dir and len(_dir) > 3: + return _dir[3:] + + return None + + def get_lsdb_stats_for_ipcp(self, + ipcp_name: str) -> dict: + """ + Get statistics for the link state database of an IPCP + :param ipcp_name: name of the IPCP + :return: statistics in a dict + """ + + address = self._get_address_for_ipcp(ipcp_name) + if address is None: + return {} + + path = os.path.join(self._get_dir_for_ipcp(ipcp_name), 'lsdb/') + if not os.path.exists(path): + return {} + + nodes = [] + neighbors = 0 + links = 0 + + lsdb_entries = [] + try: + lsdb_entries = [f.path for f in os.scandir(path)] + except IOError as _: + pass + + for lsdb_entry in lsdb_entries: + try: + with open(lsdb_entry) as e: + for line in e.readlines(): + if 'src' in line: + src = line.split()[-1] + if src not in nodes: + nodes += [src] + if src == address: + neighbors += 1 + if 'dst' in line: + dst = line.split()[-1] + if dst not in nodes: + nodes += [dst] + links += 1 + except IOError as _: + continue + + stats = {'neighbors': neighbors, + 'nodes': len(nodes), + 'links': links} + + return stats + + def _get_flows_for_process(self, + process_name: str) -> list[str]: + path = self._get_dir_for_process(process_name) + + if not os.path.exists(path): + return [] + + try: + return [f.name for f in os.scandir(path) if f.is_dir()] + except IOError as e: + print(e) + + return [] + + @staticmethod + def _get_trailing_number(s: str) -> int: + m = re.search(r'\d+$', s) + return int(m.group()) if m else None + + def _get_flow_info_for_n_plus_1_flow(self, + ipcp_name: str, + fd: str) -> dict: + + str_to_metric = { + 'Flow established at': None, + 'Remote address': None, + 'Local endpoint ID': 'endpoint_id', + 'Remote endpoint ID': None, + 'Sent (packets)': 'sent_pkts_total', + 'Sent (bytes)': 'sent_bytes_total', + 'Send failed (packets)': 'send_failed_packets_total', + 'Send failed (bytes)': 'send_failed_bytes_total', + 'Received (packets)': 'recv_pkts_total', + 'Received (bytes)': 'recv_bytes_total', + 'Receive failed (packets)': 'recv_failed_pkts_total', + 'Receive failed (bytes)': 'recv_failed_bytes_total', + 'Congestion avoidance algorithm': None, + 'Upstream congestion level': 'up_cong_lvl', + 'Downstream congestion level': 'down_cong_lvl', + 'Upstream packet counter': 'up_pkt_ctr', + 'Downstream packet counter': 'down_pkt_ctr', + 'Congestion window size (ns)': 'cong_wnd_width_ns', + 'Packets in this window': 'cong_wnd_current_pkts', + 'Bytes in this window': 'cong_wnd_current_bytes', + 'Max bytes in this window': 'cong_wnd_size_bytes', + 'Current congestion regime': None + } + + ret = dict() + + path = self._get_path_for_ipcp_flow_n_plus_1_info(ipcp_name, fd) + + if not os.path.exists(path): + return dict() + + with open(path) as f: + for line in f.readlines(): + split_line = line.split(':') + phrase = split_line[0] + metric = str_to_metric[phrase] + if metric is not None: + value = self._get_trailing_number(split_line[1]) + ret[metric] = value + + return ret + + def _get_frct_info_for_process_flow(self, + process: str, + fd: str) -> dict: + + str_to_metric = { + 'Maximum packet lifetime (ns)': 'mpl_timer_ns', + 'Max time to Ack (ns)': 'a_timer_ns', + 'Max time to Retransmit (ns)': 'r_timer_ns', + 'Smoothed rtt (ns)': 'srtt_ns', + 'RTT standard deviation (ns)': 'mdev_ns', + 'Retransmit timeout RTO (ns)': 'rto_ns', + 'Sender left window edge': 'snd_lwe', + 'Sender right window edge': 'snd_rwe', + 'Sender inactive (ns)': 'snd_inact', + 'Sender current sequence number': 'snd_seqno', + 'Receiver left window edge': 'rcv_lwe', + 'Receiver right window edge': 'rcv_rwe', + 'Receiver inactive (ns)': 'rcv_inact', + 'Receiver last ack': 'rcv_seqno', + } + + ret = dict() + + path = self._get_path_for_frct_flow_info(process, fd) + + if not os.path.exists(path): + return dict() + + ret['fd'] = fd + + with open(path) as f: + for line in f.readlines(): + split_line = line.split(':') + phrase = split_line[0] + metric = str_to_metric[phrase] + if metric is not None: + value = self._get_trailing_number(split_line[1]) + ret[metric] = value + + return ret + + def get_flow_allocator_flow_info_for_ipcp(self, + ipcp_name: str) -> list[dict]: + """ + Get the flow intformation for all N+1 flows in a certain IPCP + :param ipcp_name: name of the IPCP + :return: dict with flow information + """ + flow_info = [] + + flow_descriptors = self._get_n_plus_1_flows_for_ipcp(ipcp_name) + for flow in flow_descriptors: + info = self._get_flow_info_for_n_plus_1_flow(ipcp_name, flow) + flow_info += [info] + + return flow_info + + def _get_flow_info_for_n_minus_1_flow(self, + ipcp_name: str, + fd: str) -> dict: + + ret = dict() + + path = self._get_path_for_ipcp_flow_n_minus_1_info(ipcp_name, fd) + + str_to_qos_metric = { + 'Flow established at': None, + ' sent (packets)': 'sent_packets_total', + ' sent (bytes)': 'sent_bytes_total', + ' rcvd (packets)': 'recv_packets_total', + ' rcvd (bytes)': 'recv_bytes_total', + ' local sent (packets)': 'local_sent_packets_total', + ' local sent (bytes)': 'local_sent_bytes_total', + ' local rcvd (packets)': 'local_recv_packets_total', + ' local rcvd (bytes)': 'local_recv_bytes_total', + ' dropped ttl (packets)': 'ttl_packets_dropped_total', + ' dropped ttl (bytes)': 'ttl_bytes_dropped_total', + ' failed writes (packets)': 'write_packets_dropped_total', + ' failed writes (bytes)': 'write_bytes_dropped_total', + ' failed nhop (packets)': 'nhop_packets_dropped_total', + ' failed nhop (bytes)': 'nhop_bytes_dropped_total' + } + + if not os.path.exists(path): + return dict() + + with open(path) as f: + _current_cube = '' + ret['fd'] = fd + for line in [_line for _line in f.readlines() if _line != '\n']: + if 'Endpoint address' in line: + ret['endpoint'] = line.split(':')[-1].replace(' ', '')[:-1] + elif 'Queued packets (rx)' in line: + ret['queued_packets_rx'] = self._get_trailing_number(line) + elif 'Queued packets (tx)' in line: + ret['queued_packets_tx'] = self._get_trailing_number(line) + elif 'Qos cube' in line: + _cube = self._get_trailing_number(line[:-2]) + _current_cube = 'QoS cube ' + str(_cube) + ret[_current_cube] = dict() + else: + split_line = line.split(':') + metric = str_to_qos_metric[split_line[0]] + if metric is not None: + value = self._get_trailing_number(split_line[1]) + ret[_current_cube][metric] = value + + return ret + + def get_data_transfer_flow_info_for_ipcp(self, + ipcp_name: str) -> list[dict]: + """ + Get the flow information for all Data Transfer (N-1) flows in a certain IPCP + :param ipcp_name: name of the IPCP + :return: flow information for the data transfer flows + """ + + flow_info = [] + + flow_descriptors = self._get_n_minus_1_flows_for_ipcp(ipcp_name) + for flow in flow_descriptors: + info = self._get_flow_info_for_n_minus_1_flow(ipcp_name, flow) + flow_info += [info] + + return flow_info + + def get_frct_info_for_process(self, + process_name: str) -> list[dict]: + """ + Get the frct information for all flows for a certain process + :param process_name: name of the process + :return: flow information for the N-1 flows + """ + + frct_info = [] + + flow_descriptors = self._get_flows_for_process(process_name) + + for flow in flow_descriptors: + info = self._get_frct_info_for_process_flow(process_name, flow) + frct_info += [info] + + return frct_info + + # pylint: disable-msg=too-many-arguments + def get_ipcp_list(self, + names_only: bool = False, # return name and layer name + types: bool = True, + states: bool = True, + layers: bool = True, + flows: bool = True) -> list[dict]: + """ + Get a list of all IPCPs + :param names_only: only return IPCP names and layer names + :param types: return IPCP type + :param states: return IPCP state + :param layers: return layer in which the IPCP is enrolled + :param flows: return the number of allocated (N+1) flows for this IPCP + :return: list of dicts containing IPCP info + """ + + ipcp_list = [] + + if not os.path.exists(self.rib_path): + return [] + + for ipcp_dir in [f.path for f in os.scandir(self.rib_path) + if f.is_dir() and not f.name.startswith('proc.')]: + ipcp_name = os.path.split(ipcp_dir)[-1] + ipcp_type = None + ipcp_state = None + ipcp_layer = self._get_layer_name_for_ipcp(ipcp_name) if layers else None + ipcp_flows = None + if not names_only: + ipcp_type = self._get_ipcp_type_for_ipcp(ipcp_name) if types else None + ipcp_state = self._get_ipcp_state_for_ipcp(ipcp_name) if states else None + ipcp_flows = self._get_n_plus_1_flows_for_ipcp(ipcp_name) if flows else None + + ipcp_list += [{'name': ipcp_name, + 'type': ipcp_type, + 'state': ipcp_state, + 'layer': ipcp_layer, + 'flows': len(ipcp_flows) if ipcp_flows else None}] + return ipcp_list + # pylint: enable-msg=too-many-arguments + + def get_process_list(self) -> list[str]: + """ + Get a list of all the Ouroboros applications that may be exposing frct stats + :return: list of process names ("proc.<pid>") + """ + proc_list = [] + + if not os.path.exists(self.rib_path): + return [] + + for proc in [f.name for f in os.scandir(self.rib_path) + if f.is_dir() and f.name.startswith('proc.')]: + proc_list += [proc] + + return proc_list + + +class OuroborosExporter: + """ + Export Ouroboros metrics to InfluxDB + """ + + def __init__(self, + bucket='ouroboros-metrics', + config='/home/dstaesse/git/ouroboros_influx_exporter/config.ini', + rib_path='/tmp/ouroboros/'): + + point_settings = PointSettings() + point_settings.add_default_tag('system', socket.gethostname()) + + write_options = WriteOptions(batch_size=500, + flush_interval=10_000, + jitter_interval=1_000, + retry_interval=1_000, + max_retries=5, + max_retry_delay=30_000, + exponential_base=2) + + self.bucket = bucket + self.client = InfluxDBClient.from_config_file(config) + self.write_api = self.client.write_api(write_options=write_options, + point_settings=point_settings).write + self.query_api = self.client.query_api() + self.ribreader = OuroborosRIBReader(rib_path=rib_path) + + def __exit__(self, _type, value, traceback): + self.client.close() + + def _write_ouroboros_ipcps_total(self, + now, + ipcp_type, + n_ipcps): + + point = { + 'measurement': 'ouroboros_{}_ipcps_total'.format(ipcp_type), + 'tags': { + 'type': ipcp_type, + }, + 'fields': { + 'ipcps': n_ipcps, + 'time': str(now) + } + } + + self.write_api(bucket=self.bucket, + record=Point.from_dict(point)) + + def _write_ouroboros_flow_allocator_flows_total(self, + now, + ipcp, + layer, + n_flows): + point = { + 'measurement': 'ouroboros_flow_allocator_flows_total', + 'tags': { + 'ipcp': ipcp, + 'layer': layer + }, + 'fields': { + 'flows': n_flows, + 'time': str(now) + } + } + + self.write_api(bucket=self.bucket, + record=Point.from_dict(point)) + + # pylint: disable-msg=too-many-arguments + def _write_ouroboros_fa_congestion_metric(self, + metric: str, + now: str, + ipcp_name: str, + eid: str, + layer, + value): + + point = { + 'measurement': 'ouroboros_flow_allocator_' + metric, + 'tags': { + 'ipcp': ipcp_name, + 'layer': layer, + 'flow_id': eid + }, + 'fields': { + metric: value, + 'time': now + } + } + + try: + self.write_api(bucket=self.bucket, + record=Point.from_dict(point)) + except ApiException as e: + print(e, point) + + def _write_ouroboros_lsdb_node_metric(self, + metric: str, + now: str, + ipcp_name: str, + layer: str, + value): + + point = { + 'measurement': 'ouroboros_lsdb_' + metric + '_total', + 'tags': { + 'ipcp': ipcp_name, + 'layer': layer + }, + 'fields': { + metric: value, + 'time': now + } + } + + try: + self.write_api(bucket=self.bucket, + record=Point.from_dict(point)) + except ApiException as e: + print(e, point) + + def _write_ouroboros_data_transfer_metric(self, + metric: str, + now: str, + qos_cube: str, + fd: str, + endpoint: str, + ipcp_name: str, + layer, + value): + + point = { + 'measurement': 'ouroboros_data_transfer_' + metric, + 'tags': { + 'ipcp': ipcp_name, + 'layer': layer, + 'flow_descriptor': fd, + 'qos_cube': qos_cube, + 'endpoint': endpoint + }, + 'fields': { + metric: value, + 'time': now + } + } + + try: + self.write_api(bucket=self.bucket, + record=Point.from_dict(point)) + except ApiException as e: + print(e, point) + + def _write_ouroboros_data_transfer_queued(self, + now, + fd, + ipcp_name, + layer, + metrics) -> None: + point = dict() + for metric in metrics: + point = { + 'measurement': 'ouroboros_data_transfer_' + metric, + 'tags': { + 'ipcp': ipcp_name, + 'layer': layer, + 'flow_descriptor': fd, + }, + 'fields': { + metric: metrics[metric], + 'time': now + } + } + + try: + self.write_api(bucket=self.bucket, + record=Point.from_dict(point)) + except ApiException as e: + print(e, point) + + def _write_ouroboros_process_frct_metric(self, + now, + metric, + fd, + process, + value): + point = { + 'measurement': 'ouroboros_process_frct_' + metric, + 'tags': { + 'process': process, + 'flow_descriptor': fd, + }, + 'fields': { + metric: value, + 'time': now + } + } + + try: + self.write_api(bucket=self.bucket, + record=Point.from_dict(point)) + except ApiException as e: + print(e, point) + # pylint: enable-msg=too-many-arguments + + @staticmethod + def _filter_ipcp_list(ipcp_list: list[dict], + ipcp_type: str = None, + ipcp_state: str = None, + layer: str = None) -> list[dict]: + + if ipcp_type not in IPCP_TYPES: + return [] + + if ipcp_type: + ipcp_list = [ipcp for ipcp in ipcp_list if ipcp['type'] == ipcp_type] + + if ipcp_state: + ipcp_list = [ipcp for ipcp in ipcp_list if ipcp['state'] == ipcp_state] + + if layer: + ipcp_list = [ipcp for ipcp in ipcp_list if ipcp['layer'] == layer] + + return ipcp_list + + def _export_ouroboros_ipcps_total(self): + + ipcps = self.ribreader.get_ipcp_list() + + ipcps_total = dict() + + for _type in IPCP_TYPES: + ipcps_total[_type] = len(self._filter_ipcp_list(ipcps, ipcp_type=_type)) + + now = datetime.now(utc) + + for _type, n_ipcps in ipcps_total.items(): + self._write_ouroboros_ipcps_total(now, _type, n_ipcps) + + def _export_ouroboros_flow_allocator_flows_total(self): + + ipcps = self.ribreader.get_ipcp_list() + + now = datetime.now(utc) + + for ipcp in [ipcp for ipcp in ipcps if ipcp['flows'] is not None]: + self._write_ouroboros_flow_allocator_flows_total(now, ipcp['name'], ipcp['layer'], ipcp['flows']) + + def _export_ouroboros_fa_congestion_metrics(self): + + ipcps = self.ribreader.get_ipcp_list(names_only=True) + + now = datetime.now(utc) + + for ipcp in ipcps: + flows = self.ribreader.get_flow_allocator_flow_info_for_ipcp(ipcp['name']) + for flow in flows: + for metric in flow: + if metric == 'endpoint_id': + continue + + self._write_ouroboros_fa_congestion_metric(metric, + str(now), + ipcp['name'], + flow['endpoint_id'], + ipcp['layer'], + flow[metric]) + + def _export_ouroboros_lsdb_metrics(self): + + ipcps = self.ribreader.get_ipcp_list(names_only=True) + + now = datetime.now(utc) + + for ipcp in ipcps: + metrics = self.ribreader.get_lsdb_stats_for_ipcp(ipcp['name']) + for metric, value in metrics.items(): + self._write_ouroboros_lsdb_node_metric(metric, + str(now), + ipcp['name'], + ipcp['layer'], + value) + + def _export_ouroboros_data_transfer_metrics(self): + ipcps = self.ribreader.get_ipcp_list(names_only=True) + + now = datetime.now(utc) + + for ipcp in ipcps: + info = self.ribreader.get_data_transfer_flow_info_for_ipcp(ipcp['name']) + for flow in info: + qoscubes = [_field for _field in flow if str(_field).startswith('QoS cube')] + for qoscube in qoscubes: + for metric in flow[qoscube]: + self._write_ouroboros_data_transfer_metric(metric, + str(now), + qoscube, + flow['fd'], + flow['endpoint'], + ipcp['name'], + ipcp['layer'], + flow[qoscube][metric]) + self._write_ouroboros_data_transfer_queued(str(now), + flow['fd'], + ipcp['name'], + ipcp['layer'], + {'queued_packets_rx': flow['queued_packets_rx'], + 'queued_packets_tx': flow['queued_packets_tx']}) + + def _export_ouroboros_process_frct_metrics(self): + processes = self.ribreader.get_process_list() + + now = datetime.now(utc) + + for process in processes: + for frct_info in self.ribreader.get_frct_info_for_process(process): + for metric in frct_info: + self._write_ouroboros_process_frct_metric(str(now), + metric, + frct_info['fd'], + process, + frct_info[metric]) + + def export(self): + """ + Export all available metrics + :return: + """ + + self._export_ouroboros_ipcps_total() + self._export_ouroboros_flow_allocator_flows_total() + self._export_ouroboros_fa_congestion_metrics() + self._export_ouroboros_lsdb_metrics() + self._export_ouroboros_data_transfer_metrics() + self._export_ouroboros_process_frct_metrics() + + def run(self, + interval_ms: float = 1000): + """ + Run the ouroboros exporter + + :param interval_ms: read interval in milliseconds + :return: None + """ + + while True: + time.sleep(interval_ms / 1000.0) + self.export() + + +if __name__ == '__main__': + + exporter = OuroborosExporter() + exporter.run(interval_ms=1000) |