From 97b16ada2b710cfe88c3bc6bf9e0dc42a943fca5 Mon Sep 17 00:00:00 2001 From: Dimitri Staessens Date: Sun, 18 Jul 2021 21:13:43 +0200 Subject: exp: Add metrics and prototype update commands This adds a couple of commands to start/stop metrics exporter, set link rate on virtual wall, and updating the prototype from git, which is useful in interactive mode. --- rumba/elements/experimentation.py | 88 ++++++++++++++++++++-- rumba/prototypes/ouroboros.py | 152 ++++++++++++++++++++------------------ rumba/storyboard.py | 14 ++++ rumba/testbeds/jfed.py | 7 +- 4 files changed, 177 insertions(+), 84 deletions(-) diff --git a/rumba/elements/experimentation.py b/rumba/elements/experimentation.py index bbb3310..03d90bb 100644 --- a/rumba/elements/experimentation.py +++ b/rumba/elements/experimentation.py @@ -112,7 +112,7 @@ class Experiment(object): git_repo=None, git_branch=None, build_options=None, - add_packages=[], + add_packages=None, log_dir=None, prototype_logs=None, enrollment_strategy='minimal', @@ -128,13 +128,16 @@ class Experiment(object): :param log_dir: Where to log output of the experiment. :param prototype_logs: Where the prototype logs its output. :param enrollment_strategy: Can be 'full-mesh', 'minimal' or 'manual'. - :param dt_strategy: For data flows, 'full-mesh', 'minimal' or 'manual'. + :param flows_strategy: For data flows, 'full-mesh', 'minimal' or 'manual'. :param server_decorator: a decorator function which will be applied to storyboard.server instances when using this prototype """ if nodes is None: nodes = list() + if add_packages is None: + add_packages = list() + self.nodes = nodes if server_decorator is None: def server_decorator(server): @@ -157,10 +160,10 @@ class Experiment(object): if self.enrollment_strategy not in ['full-mesh', 'minimal', 'manual']: raise Exception('Unknown enrollment strategy "%s"' - % self.enrollment_strategy) + % self.enrollment_strategy) if self.flows_strategy not in ['full-mesh', 'minimal', 'manual']: raise Exception('Unknown flows strategy "%s"' - % self.flows_strategy) + % self.flows_strategy) # Determine log directory if log_dir is None: @@ -476,6 +479,35 @@ class Experiment(object): end = time.time() logger.info("Bootstrap took %.2f seconds", end - start) + def update_prototype(self, branch=None): + """ + Updates from branch and reinstalls and rebootstraps the prototype + """ + start = time.time() + self._terminate_prototype(force=True) + self._update_prototype(branch) + self._bootstrap_prototype() + end = time.time() + logger.info("Prototype update took %.2f seconds", end - start) + + def start_metrics_exporter(self, nodes=None, interval=100): + """ + Start the metrics exporter for the prototype on nodes exporting data every interval ms + """ + start = time.time() + self._start_metrics_exporter(nodes, interval) + end = time.time() + logger.info("Starting metrics exporters took %.2f seconds", end - start) + + def stop_metrics_exporter(self, nodes=None): + """ + Stop the metrics exporters for the prototype on nodes exporting data every interval ms + """ + start = time.time() + self._stop_metrics_exporter(nodes) + end = time.time() + logger.info("Stopping metrics exporters took %.2f seconds", end - start) + @abc.abstractmethod def destroy_dif(self, dif): raise Exception('destroy_dif() method not implemented') @@ -488,14 +520,57 @@ class Experiment(object): def _bootstrap_prototype(self): raise Exception('bootstrap_prototype() method not implemented') + @abc.abstractmethod + def _update_prototype(self, branch): + raise Exception('update_prototype() method not implemented') + @abc.abstractmethod def prototype_name(self): raise Exception('prototype_name() method not implemented') @abc.abstractmethod - def _terminate_prototype(self, force=False): + def _terminate_prototype(self, force): raise Exception('terminate_prototype() method not implemented') + @abc.abstractmethod + def _start_metrics_exporter(self, nodes, interval): + raise Exception('start_metrics_exporter() method not implemented') + + @abc.abstractmethod + def _stop_metrics_exporter(self, nodes): + raise Exception('start_metrics_exporter() method not implemented') + + def set_eth_link_rate_to(self, src, dst, megabit): + """ + Sets the link rate on src NIC towards dst with ethtool + Fixme: This should be moved to the testbed class + Fixme: Test if testbed is a physical Linux server + """ + + if megabit not in [10, 100, 1000]: + return + + self.run_command(src, "route | grep $(ping server -c 1 | head -n1 | cut -f3 -d' ' | head -c -3 | tail -c +2) | " + "grep -o '[^ ]*$' > iface ".format(dst)) + self.run_command(src, "sudo ethtool -s $(cat iface) speed {} duplex full autoneg off".format(megabit)) + self.run_command(src, "sudo ip link set $(cat iface) down") + self.run_command(src, "sudo ip link set $(cat iface) up && sleep 10") + self.run_command(src, "sudo ethtool $(cat iface) | grep '{}Mb/s'".format(megabit)) + self.run_command(src, "rm iface") + self.run_command(src, "while ! ping -c 1 {}; do sleep 1; done".format(dst)) + + def set_eth_link_rate_between(self, src, dst, megabit): + """ + Sets the link rate with ethtool + Fixme: This should be moved to the testbed class + Fixme: Test if testbed is a physical Linux server + """ + if megabit not in [10, 100, 1000]: + return + + self.set_eth_link_rate_to(src, dst, megabit) + self.set_eth_link_rate_to(dst, src, megabit) + def swap_in(self): """ Swap the experiment in on the testbed. @@ -525,7 +600,7 @@ class Experiment(object): """ Terminate the prototype in the experiment. """ - self._terminate_prototype() + self._terminate_prototype(force=force) def reboot_nodes(self): """ @@ -673,6 +748,7 @@ class Experiment(object): logger.error('Warning: pydot module not installed, ' 'cannot produce DIF graph images') + class Executor: __metaclass__ = abc.ABCMeta diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py index aa030ba..1558acd 100644 --- a/rumba/prototypes/ouroboros.py +++ b/rumba/prototypes/ouroboros.py @@ -94,7 +94,7 @@ class Experiment(mod.Experiment): git_repo='https://ouroboros.rocks/git/ouroboros', git_branch='master', build_options='', - add_packages=[], + add_packages=None, enrollment_strategy='minimal', flows_strategy='full-mesh', influxdb=None): @@ -138,6 +138,7 @@ class Experiment(mod.Experiment): self.r_ipcps = dict() self.irmd = None self.influxdb = influxdb + self.prototype_location = '/tmp/prototype/' self.set_startup_command("irmd") self.metrics_python_version = "python3.9" @@ -176,6 +177,22 @@ class Experiment(mod.Experiment): for node in self.nodes: node.execute_command("sudo nohup irmd > /dev/null &", time_out=None) + time.sleep(1) + + def _install_packages_and_execute_cmds(self, packages, cmds, nodes=None): + names = [] + executors = [] + args = [] + + if nodes is None: + nodes = self.nodes + + for node in nodes: + executor = self.make_executor(node, packages, self.testbed) + names.append(node.name) + executors.append(executor) + args.append(cmds) + m_processing.call_in_parallel(names, args, executors) def install_ouroboros(self): if isinstance(self.testbed, local.Testbed): @@ -184,27 +201,28 @@ class Experiment(mod.Experiment): packages = ["cmake", "protobuf-c-compiler", "git", "libfuse-dev", "libgcrypt20-dev", "libssl-dev"] + self.add_packages - fs_loc = '/tmp/prototype' - cmds = ["sudo DEBIAN_FRONTEND=noninteractive apt-get install libprotobuf-c-dev --yes || true", - "sudo rm -r " + fs_loc + " || true", - "git clone -b " + self.git_branch + " " + self.git_repo + \ - " " + fs_loc, - "cd " + fs_loc + " && mkdir build && cd build && " + + "sudo rm -r " + self.prototype_location + " || true", + "git clone -b " + self.git_branch + " " + self.git_repo + " " + self.prototype_location, + "cd " + self.prototype_location + " && mkdir build && cd build && " + "cmake " + self.build_options + " .. && " + "sudo make install -j$(nproc) && " + "sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf"] - names = [] - executors = [] - args = [] + self._install_packages_and_execute_cmds(packages, cmds) - for node in self.nodes: - executor = self.make_executor(node, packages, self.testbed) - names.append(node.name) - executors.append(executor) - args.append(cmds) - m_processing.call_in_parallel(names, args, executors) + def update_ouroboros(self, branch): + if isinstance(self.testbed, local.Testbed): + return + + if branch is not None: + self.git_branch = branch + + cmds = ["cd " + self.prototype_location + " && " + + "git checkout " + self.git_branch + " && " + + "git pull && cd build && sudo make install -j$(nproc)"] + + self._install_packages_and_execute_cmds([], cmds) def _install_python_ppa(self): packages = ["software-properties-common"] @@ -212,17 +230,7 @@ class Experiment(mod.Experiment): cmds = ["sudo add-apt-repository {} -y".format(python_ppa), "sudo apt-get update"] - names = [] - args = [] - executors = [] - - for node in self.nodes: - executor = self.make_executor(node, packages, self.testbed) - names.append(node.name) - executors.append(executor) - args.append(cmds) - - m_processing.call_in_parallel(names, args, executors) + self._install_packages_and_execute_cmds(packages, cmds) def _install_python_version(self, python_version): python_packages = ["distutils", "venv"] @@ -235,19 +243,7 @@ class Experiment(mod.Experiment): for lib in python_libs: packages += ["lib" + python_version + "-" + lib] - cmds = [] - - names = [] - args = [] - executors = [] - - for node in self.nodes: - executor = self.make_executor(node, packages, self.testbed) - names.append(node.name) - executors.append(executor) - args.append(cmds) - - m_processing.call_in_parallel(names, args, executors) + self._install_packages_and_execute_cmds(packages, []) def install_ouroboros_python_exporter(self): if isinstance(self.testbed, local.Testbed): @@ -284,17 +280,18 @@ class Experiment(mod.Experiment): "echo 'timeout=6000' >> config.ini", 'echo "proxy=$http_proxy" >> config.ini', "echo 'verify_ssl=False' >> config.ini"] - names = [] - executors = [] - args = [] - for node in self.nodes: - executor = self.make_executor(node, [], self.testbed) - names.append(node.name) - args.append(cmds) - executors.append(executor) + self._install_packages_and_execute_cmds([], cmds) - m_processing.call_in_parallel(names, args, executors) + def start_ouroboros_metrics_exporter(self, nodes, interval=100): + cmds = ["nohup ~/venv/bin/python3.9 ~/oexport.py --interval {} > /dev/null 2>&1 &".format(interval)] + + self._install_packages_and_execute_cmds([], cmds, nodes) + + def stop_ouroboros_metrics_exporter(self, nodes): + cmds = ["kill $(ps ax | grep oexport.py | grep -v grep| cut -f1 -d' ') || true"] + + self._install_packages_and_execute_cmds([], cmds, nodes) def create_ipcps(self): for node in self.nodes: @@ -334,9 +331,8 @@ class Experiment(mod.Experiment): cmd2 += " ipcp " + ipcp_b.name cmds2.append(cmd2) elif isinstance(ipcp.dif, mod.ShimUDPDIF): - # FIXME: Will fail, since we don't keep IPs yet - cmd += " type udp" - cmd += " layer " + ipcp.dif.name + logger.error("UDP IPCP not supported yet") + continue else: logger.error("Unsupported IPCP type") continue @@ -422,32 +418,36 @@ class Experiment(mod.Experiment): logger.info("All done, have fun!") + def _update_prototype(self, branch): + logger.info("Updating Ouroboros") + self.update_ouroboros(branch) + logger.info("Update done") + def _terminate_prototype(self, force=False): cmds = list() if force is True: kill = 'killall -9 ' - cmds.append(kill + 'irmd') - cmds.append(kill + 'ipcpd-unicast') - cmds.append(kill + 'ipcpd-broadcast') - cmds.append(kill + 'ipcpd-eth-llc') - cmds.append(kill + 'ipcpd-eth-dix') - cmds.append(kill + 'ipcpd-udp') - cmds.append(kill + 'ipcpd-local') - cmds.append(kill + 'ocbr') - cmds.append(kill + 'oping') - cmds.append(kill + 'operf') - cmds.append(kill + 'oecho') - cmds.append(kill + 'ioquake3.x86_64') - cmds.append(kill + 'ioq3ded.x86_64') - cmds.append(kill + 'ouroborosio') - cmds.append('rm -rf /dev/shm/ouroboros.*') - cmds.append('for i in /tmp/ouroboros/*; do fusermount -u $i; rmdir $i; done') - cmds.append('rm -rf /tmp/ouroboros') - cmds.append('kill -9 $(ps axjf | grep \'sudo irmd\' ' - '| grep -v grep | cut -f4 -d " "') + cmds.append(kill + 'irmd || true') + cmds.append(kill + 'ipcpd-unicast || true') + cmds.append(kill + 'ipcpd-broadcast || true') + cmds.append(kill + 'ipcpd-eth-llc || true') + cmds.append(kill + 'ipcpd-eth-dix || true') + cmds.append(kill + 'ipcpd-udp || true') + cmds.append(kill + 'ipcpd-local || true') + cmds.append(kill + 'ocbr || true') + cmds.append(kill + 'oping || true') + cmds.append(kill + 'operf || true') + cmds.append(kill + 'oecho || true') + cmds.append(kill + 'ioquake3.x86_64 || true') + cmds.append(kill + 'ioq3ded.x86_64 || true') + cmds.append(kill + 'ouroborosio || true') + cmds.append('rm -rf /dev/shm/ouroboros.* || true') + cmds.append('bash -c "for i in /tmp/ouroboros/*; do fusermount -u $i; rmdir $i; done || true"') + cmds.append('rm -rf /tmp/ouroboros || true') + cmds.append('kill -9 $(ps axjf | grep \'sudo irmd\' | grep -v grep | cut -f4 -d " ") || true') else: - cmds.append('killall -15 irmd') + cmds.append('killall -15 irmd || true') logger.info("Killing Ouroboros...") if isinstance(self.testbed, local.Testbed): @@ -458,6 +458,12 @@ class Experiment(mod.Experiment): for node in self.nodes: node.execute_commands(cmds, time_out=None, as_root=True) + def _start_metrics_exporter(self, nodes, interval): + self.start_ouroboros_metrics_exporter(nodes, interval) + + def _stop_metrics_exporter(self, nodes): + self.stop_ouroboros_metrics_exporter(nodes) + def destroy_dif(self, dif): for ipcp in dif.ipcps: ipcp.node.execute_command('irm i d n ' + ipcp.name) @@ -493,7 +499,7 @@ class Experiment(mod.Experiment): # Get IPCP address if not hasattr(ipcp, 'address'): - path = '/tmp/ouroboros/'+ ipcp.name + '/dt*' + path = '/tmp/ouroboros/' + ipcp.name + '/dt*' dt_path = node.execute_command('ls -d %s' % path) dts = dt_path.split('.') ipcp.address = int(dts[-1]) diff --git a/rumba/storyboard.py b/rumba/storyboard.py index 6d9739c..59fbf6f 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -754,6 +754,20 @@ class StoryBoard(_SBEntity): command = [command] _node.execute_commands(command) + def start_metrics_exporter(self, nodes=None, interval=100): + """ + Storyboard wrapper to start experiment metrics + """ + + self.experiment.start_metrics_exporter(nodes, interval=interval) + + def stop_metrics_exporter(self, nodes=None): + """ + storyboard wrapper to stop experiment metrics + """ + + self.experiment.stop_metrics_exporter(nodes=nodes) + def add_event(self, event): """ Adds an event to this script. diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index cf8f556..828a74e 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -381,8 +381,5 @@ class Testbed(mod.Testbed): else: logger.debug("Node %s interface %s has name %s." % (node_n.name, mac, ifname)) - # comp_id = intf.getAttribute("component_id") - # comp_arr = comp_id.split(":") - # ipcp.ifname = comp_arr[-1] - # xml_ip = intf.getElementsByTagName("ip") - # interface.ip = xml_ip[0].getAttribute("address") + + -- cgit v1.2.3