aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Aerts <aerts.nick@gmail.com>2018-03-23 23:36:34 +0100
committerNick Aerts <nick.aerts@ugent.be>2018-03-27 18:16:45 +0200
commitdfd400c2fb1383bcd6e3862d6199fffad4e78524 (patch)
tree22628b20331328e4d57657494d0321d07f89badb
parenteef25fb48735a5db9613bf00b47f6cf4703b815d (diff)
downloadrumba-dfd400c2fb1383bcd6e3862d6199fffad4e78524.tar.gz
rumba-dfd400c2fb1383bcd6e3862d6199fffad4e78524.zip
linkquality: added link_quality to add delay, loss and rate limit to link
This adds the ability to assign delay and loss to links. 4 new object types are introduced: - LinkQuality - Delay - Loss - Rate All attributes are read-only, one attribute link_quality is added to the ShimEthDIF with a callback to the LinkQualityManager which will automatically apply a new link_quality profile when this attribute is written.
-rw-r--r--rumba/model.py212
-rw-r--r--rumba/testbeds/dockertb.py5
-rw-r--r--rumba/testbeds/emulab.py5
-rw-r--r--rumba/testbeds/jfed.py4
-rw-r--r--rumba/testbeds/local.py4
-rw-r--r--rumba/testbeds/qemu.py17
6 files changed, 211 insertions, 36 deletions
diff --git a/rumba/model.py b/rumba/model.py
index d46cff9..b6cb15e 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -8,6 +8,7 @@
# Dimitri Staessens <dimitri.staessens@ugent.be>
# Vincenzo Maffione <v.maffione@nextworks.it>
# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -29,6 +30,7 @@ import os
import stat
import time
import shutil
+from enum import Enum
import rumba.log as log
@@ -85,16 +87,24 @@ class Testbed(object):
else:
self.system_logs = system_logs
- @abc.abstractmethod
def swap_in(self, experiment):
for node in experiment.nodes:
node.executor = self.executor
+ self._swap_in(experiment)
+
+ for dif in experiment.dif_ordering:
+ if isinstance(dif, ShimEthDIF):
+ dif.link_quality.apply(dif)
+
+ @abc.abstractmethod
+ def _swap_in(self, experiment):
+ logger.info("_swap_in(): nothing to do")
+
@abc.abstractmethod
def swap_out(self, experiment):
logger.info("swap_out(): nothing to do")
-
# Base class for DIFs
#
# @name [string] DIF name
@@ -149,11 +159,9 @@ class ShimEthDIF(DIF):
def get_e_id(self):
return "ShimEthDIF." + self.name
- def __init__(self, name, members=None, link_speed=0):
+ def __init__(self, name, members=None, link_quality=None):
DIF.__init__(self, name, members)
- self.link_speed = int(link_speed)
- if self.link_speed < 0:
- raise ValueError("link_speed must be a non-negative number")
+ self._link_quality = link_quality
def get_ipcp_class(self):
return ShimEthIPCP
@@ -163,6 +171,24 @@ class ShimEthDIF(DIF):
if len(self.members) > 2:
raise Exception("More than 2 members in %s!" % self.name)
+ @property
+ def link_quality(self):
+ return self._link_quality
+
+ @link_quality.setter
+ def link_quality(self, _link_quality):
+ if not _link_quality:
+ raise ValueError("Cannot set link_quality to None, use del "
+ "link_quality to reset")
+
+ self._link_quality = _link_quality
+
+ _link_quality.apply(self)
+
+ @link_quality.deleter
+ def link_quality(self):
+ self._link_quality.deactivate(self)
+
# Normal DIF
#
@@ -193,6 +219,180 @@ class NormalDIF(DIF):
return s
+class Distribution(Enum):
+ NORMAL = 1
+ PARETO = 2
+ PARETONORMAL = 3
+
+
+class Delay(object):
+ def __init__(self, delay=0, jitter=None, correlation=None,
+ distribution=None):
+ """
+ Configure link delay
+ :param delay: average delay in ms
+ :param jitter: jitter in ms
+ :param correlation: correlation in %
+ :param distribution: delay distribution, defaults to a Normal
+ distribution
+ """
+
+ if delay < 0:
+ raise ValueError("Delay needs to be at least 0")
+
+ if jitter and not jitter > 0:
+ raise ValueError("Jitter needs to be higher than 0")
+
+ if (not jitter) and correlation:
+ raise ValueError("Correlation requires a value for jitter")
+
+ if correlation and (correlation < 0 or correlation > 100):
+ raise ValueError("Correlation needs to be between 0 and 100")
+
+ self._delay = delay
+ self._jitter = jitter
+ self._correlation = correlation
+ self._distribution = distribution
+
+ @property
+ def delay(self):
+ return self._delay
+
+ @property
+ def jitter(self):
+ return self._jitter
+
+ @property
+ def correlation(self):
+ return self._correlation
+
+ @property
+ def distribution(self):
+ return self._distribution
+
+ def build_command(self):
+ opts = ["delay %ims" % self.delay]
+
+ if self.jitter:
+ opts.append("%ims" % self.jitter)
+
+ if self.correlation:
+ opts.append("%f%%" % self.correlation)
+
+ if self.distribution:
+ opts.append("distribution %s" % self.distribution.name.lower())
+
+ return " ".join(opts)
+
+
+class Loss(object):
+ def __init__(self, loss, correlation=None):
+ """
+ Configure link loss
+ :param loss: loss in percentage
+ :param correlation: correlation in percentage
+ """
+ if loss and (loss < 0 or loss > 100):
+ raise ValueError("Loss needs to be between 0 and 100")
+
+ if correlation and (correlation < 0 or correlation > 100):
+ raise ValueError("Correlation needs to be between 0 and 100")
+
+ self._loss = loss
+ self._correlation = correlation
+
+ @property
+ def loss(self):
+ return self._loss
+
+ @property
+ def correlation(self):
+ return self._correlation
+
+ def build_command(self):
+ opts = ["loss %f%%" % self.loss]
+
+ if self.correlation:
+ opts.append("%f%%" % self.correlation)
+
+ return " ".join(opts)
+
+
+class LinkQuality(object):
+ _active = set()
+
+ def __init__(self, delay=None, loss=None, rate=None):
+ """
+ Link quality configuration
+ :param delay: Delay object holding delay configuration
+ :param loss: Loss object holding delay configuration
+ :param rate: The rate of the link in mbit
+ """
+
+ if rate and not rate > 0:
+ raise ValueError("Rate needs to be higher than 0")
+
+ self._delay = delay
+ self._loss = loss
+ self._rate = rate
+
+ @property
+ def delay(self):
+ return self._delay
+
+ @property
+ def loss(self):
+ return self._loss
+
+ @property
+ def rate(self):
+ return self._rate
+
+ def build_command(self, ipcp):
+ cmd = []
+
+ if ipcp in LinkQuality._active:
+ cmd.append("tc qdisc change dev %s root netem" % ipcp.ifname)
+ else:
+ cmd.append("tc qdisc add dev %s root netem" % ipcp.ifname)
+
+ if self.delay:
+ cmd.append(self.delay.build_command())
+
+ if self.loss:
+ cmd.append(self.loss.build_command())
+
+ if self.rate:
+ cmd.append("rate %imbit" % self.rate)
+
+ return " ".join(cmd)
+
+ def apply(self, shim):
+ if not (self.delay or self.loss or self.rate):
+ self.deactivate(shim)
+ else:
+ for ipcp in shim.ipcps:
+ if not ipcp.ifname:
+ logger.error("Could not apply LinkQuality to IPCP because "
+ "the interface name is None")
+ continue
+
+ ipcp.node.execute_command(self.build_command(ipcp),
+ as_root=True)
+ LinkQuality._active.add(ipcp)
+
+ def deactivate(self, shim):
+ for ipcp in shim.ipcps:
+ if not ipcp.ifname:
+ logger.error("Could not remove LinkQuality from IPCP because "
+ "the interface name is None")
+ continue
+
+ ipcp.node.execute_command("tc qdisc del dev %s root "
+ "netem" % ipcp.ifname, as_root=True)
+ LinkQuality._active.remove(ipcp)
+
+
# SSH Configuration
#
class SSHConfig(object):
diff --git a/rumba/testbeds/dockertb.py b/rumba/testbeds/dockertb.py
index 3895a7e..d018f8c 100644
--- a/rumba/testbeds/dockertb.py
+++ b/rumba/testbeds/dockertb.py
@@ -33,7 +33,6 @@ import rumba.model as mod
import rumba.log as log
from rumba.executors.docker import DockerExecutor
-
logger = log.get_logger(__name__)
class Testbed(mod.Testbed):
@@ -56,11 +55,9 @@ class Testbed(mod.Testbed):
self.docker_client = docker.from_env()
self.executor = DockerExecutor(self)
- def swap_in(self, experiment):
+ def _swap_in(self, experiment):
docker_client = self.docker_client
- mod.Testbed.swap_in(self, experiment)
-
# Pull image
if self.pull_image:
docker_client.images.pull(self.base_image_repo,
diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py
index 06e0e5c..0d18892 100644
--- a/rumba/testbeds/emulab.py
+++ b/rumba/testbeds/emulab.py
@@ -271,16 +271,13 @@ class Testbed(mod.Testbed):
if self.ip[ipcp] == item[1]:
ipcp.ifname = item[0]
- def swap_in(self, experiment):
- mod.Testbed.swap_in(self, experiment)
-
+ def _swap_in(self, experiment):
self._create_experiment(experiment)
wait = self.swap_exp_in()
if wait:
self.wait_until_nodes_up()
self.complete_experiment_graph(experiment)
-
def swap_out(self, experiment):
"""
Swaps experiment out
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py
index db9dd14..a1ceded 100644
--- a/rumba/testbeds/jfed.py
+++ b/rumba/testbeds/jfed.py
@@ -206,9 +206,7 @@ class Testbed(mod.Testbed):
logger.error("jFed returned with error " + str(e.returncode))
raise
- def swap_in(self, experiment):
- mod.Testbed.swap_in(self, experiment)
-
+ def _swap_in(self, experiment):
for node in experiment.nodes:
node.ssh_config.set_http_proxy(self.http_proxy)
self.create_rspec(experiment)
diff --git a/rumba/testbeds/local.py b/rumba/testbeds/local.py
index 77aed82..7c1aab1 100644
--- a/rumba/testbeds/local.py
+++ b/rumba/testbeds/local.py
@@ -40,9 +40,7 @@ class Testbed(mod.Testbed):
self.executor = LocalExecutor(self)
- def swap_in(self, experiment):
- mod.Testbed.swap_in(self, experiment)
-
+ def _swap_in(self, experiment):
logger.info("Experiment swapped in")
def swap_out(self, experiment):
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index df80b7e..5f76684 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -125,14 +125,12 @@ class Testbed(mod.Testbed):
"ip link set dev %(ifname)s up"
% {'ifname': ipcp.ifname})
- def swap_in(self, experiment):
+ def _swap_in(self, experiment):
"""
:type experiment mod.Experiment
:param experiment: The experiment running
"""
- mod.Testbed.swap_in(self, experiment)
-
if os.geteuid() != 0:
try:
subprocess.check_call(["sudo", "-v"])
@@ -195,19 +193,6 @@ class Testbed(mod.Testbed):
'brctl addif %(br)s %(tap)s'
% {'tap': tap_id, 'br': shim.name}
).split('\n')
- if shim.link_speed > 0:
- speed = '%dmbit' % shim.link_speed
-
- # Rate limit the traffic transmitted on the TAP interface
- command_list += (
- 'tc qdisc add dev %(tap)s handle 1: root '
- 'htb default 11\n'
- 'tc class add dev %(tap)s parent 1: classid '
- '1:1 htb rate 10gbit\n'
- 'tc class add dev %(tap)s parent 1:1 classid '
- '1:11 htb rate %(speed)s'
- % {'tap': tap_id, 'speed': speed}
- ).split('\n')
# While we're at it, build vm ports table and ipcp table
vm['ports'].append({'tap_id': tap_id,