From 0baf4da9e699170fc64a9436f7fb2498c0599081 Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 30 May 2018 16:22:56 +0200 Subject: storyboard: Add function to export flows stats This adds a function to export the bytes sent by flows in a certain DIF. It is implemented within the storyboard, which calls an abstract function in the Experiment class so that every prototype can implement it in its own specific way. This commit adds the implementation for Ouroboros. It exports it to a csv, with the first item the source IPCP, the second the destination IPCP and then the bytes sent on the flow. --- rumba/elements/experimentation.py | 4 +++ rumba/prototypes/ouroboros.py | 67 +++++++++++++++++++++++++++++++++++++++ rumba/storyboard.py | 18 +++++++++++ 3 files changed, 89 insertions(+) diff --git a/rumba/elements/experimentation.py b/rumba/elements/experimentation.py index 1ae50f7..8f3f40a 100644 --- a/rumba/elements/experimentation.py +++ b/rumba/elements/experimentation.py @@ -525,6 +525,10 @@ class Experiment(object): end = time.time() logger.info("Swap-out took %.2f seconds", end - start) + @abc.abstractmethod + def export_dif_bandwidth(self, filename, dif): + raise Exception('Export DIF bandwidth method not implemented') + def to_dms_yaml(self, filename): """ Generate a YAML file of the experiment which can be fed to the diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py index a7a7fc9..653f564 100644 --- a/rumba/prototypes/ouroboros.py +++ b/rumba/prototypes/ouroboros.py @@ -26,6 +26,7 @@ import time import subprocess +import re import rumba.ssh_support as ssh import rumba.model as mod @@ -250,3 +251,69 @@ class Experiment(mod.Experiment): def destroy_dif(self, dif): for ipcp in dif.ipcps: ipcp.node.execute_command('irm i d n ' + ipcp.name) + + def parse_stats(self, lines, spaces=0): + d = {} + + while len(lines): + line = lines[0] + + if not re.match(" {%i}.*" % spaces, line): + return d + + lines.pop(0) + + line = line.strip() + + if re.match(".*:.*", line): + head, tail = line.split(":", 1) + + if len(tail) == 0: + d[head] = self.parse_stats(lines, spaces+1) + else: + d[head] = tail.strip() + + return d + + + def export_dif_bandwidth(self, filename, dif): + f = open(filename, 'w') + + for node in dif.members: + ipcp = node.get_ipcp_by_dif(dif) + + # Get IPCP address + if not hasattr(ipcp, 'address'): + path = '/tmp/ouroboros/'+ ipcp.name + '/dt*' + dt_path = node.execute_command('ls -d %s' % path) + dts = dt_path.split('.') + ipcp.address = int(dts[-1]) + logger.info('IPCP %s has dt component ' + 'with address %d' % (ipcp.name, ipcp.address)) + + for node in dif.members: + ipcp = node.get_ipcp_by_dif(dif) + + dt_path = '/tmp/ouroboros/' + ipcp.name + '/dt.' + \ + str(ipcp.address) + '/' + + # Get flows to other endpoints + fd = node.execute_command('ls --ignore=[01] %s' % dt_path) + fds = fd.split('\n') + for fd in fds: + fd_path = dt_path + fd + fd_file = node.execute_command('cat %s' % fd_path) + + d = self.parse_stats(fd_file.splitlines()) + remote = d["Endpoint address"] + ipcp2_name = '' + for ipcp2 in dif.ipcps: + if ipcp2.address == int(remote): + ipcp2_name = ipcp2.name + + nr = d["Qos cube 0"]["sent (bytes)"] + + f.write('%s;%s;%s\n' % (ipcp.name, ipcp2_name, nr)) + + f.close() + logger.info('Wrote stats to %s', filename) diff --git a/rumba/storyboard.py b/rumba/storyboard.py index 56be7e0..beadf91 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -1035,6 +1035,24 @@ class StoryBoard(_SBEntity): logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, dst_dir) + def schedule_export_dif_bandwidth(self, t, filename, dif): + """ + Schedules the generation of a csv file of the bandwidth used by + flows in a certain DIF at a certain time. + + :param filename: The output csv filename. + :param dif: The DIF to export + """ + if self.experiment is None: + raise ValueError("An experiment is needed to schedule commands.") + + if self._script is None: + self._script = _Script(self) + + action = functools.partial(self.experiment.export_dif_bandwidth, + filename, dif) + self._script.add_event(Event(action, ev_time=t)) + def schedule_link_state(self, t, dif, state): """ Schedules a link's (`rumba.model.ShimEthDIF`) state to go -- cgit v1.2.3