From dfd400c2fb1383bcd6e3862d6199fffad4e78524 Mon Sep 17 00:00:00 2001 From: Nick Aerts Date: Fri, 23 Mar 2018 23:36:34 +0100 Subject: linkquality: added link_quality to add delay, loss and rate limit to link This adds the ability to assign delay and loss to links. 4 new object types are introduced: - LinkQuality - Delay - Loss - Rate All attributes are read-only, one attribute link_quality is added to the ShimEthDIF with a callback to the LinkQualityManager which will automatically apply a new link_quality profile when this attribute is written. --- rumba/model.py | 212 +++++++++++++++++++++++++++++++++++++++++++-- rumba/testbeds/dockertb.py | 5 +- rumba/testbeds/emulab.py | 5 +- rumba/testbeds/jfed.py | 4 +- rumba/testbeds/local.py | 4 +- rumba/testbeds/qemu.py | 17 +--- 6 files changed, 211 insertions(+), 36 deletions(-) diff --git a/rumba/model.py b/rumba/model.py index d46cff9..b6cb15e 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -8,6 +8,7 @@ # Dimitri Staessens # Vincenzo Maffione # Marco Capitani +# Nick Aerts # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -29,6 +30,7 @@ import os import stat import time import shutil +from enum import Enum import rumba.log as log @@ -85,16 +87,24 @@ class Testbed(object): else: self.system_logs = system_logs - @abc.abstractmethod def swap_in(self, experiment): for node in experiment.nodes: node.executor = self.executor + self._swap_in(experiment) + + for dif in experiment.dif_ordering: + if isinstance(dif, ShimEthDIF): + dif.link_quality.apply(dif) + + @abc.abstractmethod + def _swap_in(self, experiment): + logger.info("_swap_in(): nothing to do") + @abc.abstractmethod def swap_out(self, experiment): logger.info("swap_out(): nothing to do") - # Base class for DIFs # # @name [string] DIF name @@ -149,11 +159,9 @@ class ShimEthDIF(DIF): def get_e_id(self): return "ShimEthDIF." + self.name - def __init__(self, name, members=None, link_speed=0): + def __init__(self, name, members=None, link_quality=None): DIF.__init__(self, name, members) - self.link_speed = int(link_speed) - if self.link_speed < 0: - raise ValueError("link_speed must be a non-negative number") + self._link_quality = link_quality def get_ipcp_class(self): return ShimEthIPCP @@ -163,6 +171,24 @@ class ShimEthDIF(DIF): if len(self.members) > 2: raise Exception("More than 2 members in %s!" % self.name) + @property + def link_quality(self): + return self._link_quality + + @link_quality.setter + def link_quality(self, _link_quality): + if not _link_quality: + raise ValueError("Cannot set link_quality to None, use del " + "link_quality to reset") + + self._link_quality = _link_quality + + _link_quality.apply(self) + + @link_quality.deleter + def link_quality(self): + self._link_quality.deactivate(self) + # Normal DIF # @@ -193,6 +219,180 @@ class NormalDIF(DIF): return s +class Distribution(Enum): + NORMAL = 1 + PARETO = 2 + PARETONORMAL = 3 + + +class Delay(object): + def __init__(self, delay=0, jitter=None, correlation=None, + distribution=None): + """ + Configure link delay + :param delay: average delay in ms + :param jitter: jitter in ms + :param correlation: correlation in % + :param distribution: delay distribution, defaults to a Normal + distribution + """ + + if delay < 0: + raise ValueError("Delay needs to be at least 0") + + if jitter and not jitter > 0: + raise ValueError("Jitter needs to be higher than 0") + + if (not jitter) and correlation: + raise ValueError("Correlation requires a value for jitter") + + if correlation and (correlation < 0 or correlation > 100): + raise ValueError("Correlation needs to be between 0 and 100") + + self._delay = delay + self._jitter = jitter + self._correlation = correlation + self._distribution = distribution + + @property + def delay(self): + return self._delay + + @property + def jitter(self): + return self._jitter + + @property + def correlation(self): + return self._correlation + + @property + def distribution(self): + return self._distribution + + def build_command(self): + opts = ["delay %ims" % self.delay] + + if self.jitter: + opts.append("%ims" % self.jitter) + + if self.correlation: + opts.append("%f%%" % self.correlation) + + if self.distribution: + opts.append("distribution %s" % self.distribution.name.lower()) + + return " ".join(opts) + + +class Loss(object): + def __init__(self, loss, correlation=None): + """ + Configure link loss + :param loss: loss in percentage + :param correlation: correlation in percentage + """ + if loss and (loss < 0 or loss > 100): + raise ValueError("Loss needs to be between 0 and 100") + + if correlation and (correlation < 0 or correlation > 100): + raise ValueError("Correlation needs to be between 0 and 100") + + self._loss = loss + self._correlation = correlation + + @property + def loss(self): + return self._loss + + @property + def correlation(self): + return self._correlation + + def build_command(self): + opts = ["loss %f%%" % self.loss] + + if self.correlation: + opts.append("%f%%" % self.correlation) + + return " ".join(opts) + + +class LinkQuality(object): + _active = set() + + def __init__(self, delay=None, loss=None, rate=None): + """ + Link quality configuration + :param delay: Delay object holding delay configuration + :param loss: Loss object holding delay configuration + :param rate: The rate of the link in mbit + """ + + if rate and not rate > 0: + raise ValueError("Rate needs to be higher than 0") + + self._delay = delay + self._loss = loss + self._rate = rate + + @property + def delay(self): + return self._delay + + @property + def loss(self): + return self._loss + + @property + def rate(self): + return self._rate + + def build_command(self, ipcp): + cmd = [] + + if ipcp in LinkQuality._active: + cmd.append("tc qdisc change dev %s root netem" % ipcp.ifname) + else: + cmd.append("tc qdisc add dev %s root netem" % ipcp.ifname) + + if self.delay: + cmd.append(self.delay.build_command()) + + if self.loss: + cmd.append(self.loss.build_command()) + + if self.rate: + cmd.append("rate %imbit" % self.rate) + + return " ".join(cmd) + + def apply(self, shim): + if not (self.delay or self.loss or self.rate): + self.deactivate(shim) + else: + for ipcp in shim.ipcps: + if not ipcp.ifname: + logger.error("Could not apply LinkQuality to IPCP because " + "the interface name is None") + continue + + ipcp.node.execute_command(self.build_command(ipcp), + as_root=True) + LinkQuality._active.add(ipcp) + + def deactivate(self, shim): + for ipcp in shim.ipcps: + if not ipcp.ifname: + logger.error("Could not remove LinkQuality from IPCP because " + "the interface name is None") + continue + + ipcp.node.execute_command("tc qdisc del dev %s root " + "netem" % ipcp.ifname, as_root=True) + LinkQuality._active.remove(ipcp) + + # SSH Configuration # class SSHConfig(object): diff --git a/rumba/testbeds/dockertb.py b/rumba/testbeds/dockertb.py index 3895a7e..d018f8c 100644 --- a/rumba/testbeds/dockertb.py +++ b/rumba/testbeds/dockertb.py @@ -33,7 +33,6 @@ import rumba.model as mod import rumba.log as log from rumba.executors.docker import DockerExecutor - logger = log.get_logger(__name__) class Testbed(mod.Testbed): @@ -56,11 +55,9 @@ class Testbed(mod.Testbed): self.docker_client = docker.from_env() self.executor = DockerExecutor(self) - def swap_in(self, experiment): + def _swap_in(self, experiment): docker_client = self.docker_client - mod.Testbed.swap_in(self, experiment) - # Pull image if self.pull_image: docker_client.images.pull(self.base_image_repo, diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py index 06e0e5c..0d18892 100644 --- a/rumba/testbeds/emulab.py +++ b/rumba/testbeds/emulab.py @@ -271,16 +271,13 @@ class Testbed(mod.Testbed): if self.ip[ipcp] == item[1]: ipcp.ifname = item[0] - def swap_in(self, experiment): - mod.Testbed.swap_in(self, experiment) - + def _swap_in(self, experiment): self._create_experiment(experiment) wait = self.swap_exp_in() if wait: self.wait_until_nodes_up() self.complete_experiment_graph(experiment) - def swap_out(self, experiment): """ Swaps experiment out diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index db9dd14..a1ceded 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -206,9 +206,7 @@ class Testbed(mod.Testbed): logger.error("jFed returned with error " + str(e.returncode)) raise - def swap_in(self, experiment): - mod.Testbed.swap_in(self, experiment) - + def _swap_in(self, experiment): for node in experiment.nodes: node.ssh_config.set_http_proxy(self.http_proxy) self.create_rspec(experiment) diff --git a/rumba/testbeds/local.py b/rumba/testbeds/local.py index 77aed82..7c1aab1 100644 --- a/rumba/testbeds/local.py +++ b/rumba/testbeds/local.py @@ -40,9 +40,7 @@ class Testbed(mod.Testbed): self.executor = LocalExecutor(self) - def swap_in(self, experiment): - mod.Testbed.swap_in(self, experiment) - + def _swap_in(self, experiment): logger.info("Experiment swapped in") def swap_out(self, experiment): diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index df80b7e..5f76684 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -125,14 +125,12 @@ class Testbed(mod.Testbed): "ip link set dev %(ifname)s up" % {'ifname': ipcp.ifname}) - def swap_in(self, experiment): + def _swap_in(self, experiment): """ :type experiment mod.Experiment :param experiment: The experiment running """ - mod.Testbed.swap_in(self, experiment) - if os.geteuid() != 0: try: subprocess.check_call(["sudo", "-v"]) @@ -195,19 +193,6 @@ class Testbed(mod.Testbed): 'brctl addif %(br)s %(tap)s' % {'tap': tap_id, 'br': shim.name} ).split('\n') - if shim.link_speed > 0: - speed = '%dmbit' % shim.link_speed - - # Rate limit the traffic transmitted on the TAP interface - command_list += ( - 'tc qdisc add dev %(tap)s handle 1: root ' - 'htb default 11\n' - 'tc class add dev %(tap)s parent 1: classid ' - '1:1 htb rate 10gbit\n' - 'tc class add dev %(tap)s parent 1:1 classid ' - '1:11 htb rate %(speed)s' - % {'tap': tap_id, 'speed': speed} - ).split('\n') # While we're at it, build vm ports table and ipcp table vm['ports'].append({'tap_id': tap_id, -- cgit v1.2.3