aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2018-03-29 15:56:37 +0200
committerMarco Capitani <m.capitani@nextworks.it>2018-03-29 16:11:35 +0200
commitc1ad42adc854604e0023a01633976cb3d26b9244 (patch)
tree9767cb882eccb72d61a077c5b75e894ce1198ac4
parentea00b982b55b868f4d32a5f237d47166233d8d58 (diff)
downloadrumba-c1ad42adc854604e0023a01633976cb3d26b9244.tar.gz
rumba-c1ad42adc854604e0023a01633976cb3d26b9244.zip
model: split model into two files
Changed `rumba.model` into a namespace, moved business logic inside the rumba.elements package
-rw-r--r--rumba/elements/__init__.py0
-rw-r--r--rumba/elements/experimentation.py539
-rw-r--r--rumba/elements/topology.py869
-rw-r--r--rumba/model.py1394
-rwxr-xr-xsetup.py8
5 files changed, 1468 insertions, 1342 deletions
diff --git a/rumba/elements/__init__.py b/rumba/elements/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/rumba/elements/__init__.py
diff --git a/rumba/elements/experimentation.py b/rumba/elements/experimentation.py
new file mode 100644
index 0000000..2626a0a
--- /dev/null
+++ b/rumba/elements/experimentation.py
@@ -0,0 +1,539 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Dimitri Staessens <dimitri.staessens@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
+
+
+import abc
+import os
+import shutil
+import time
+
+import rumba.log as log
+import rumba.elements.topology as topology
+
+logger = log.get_logger(__name__)
+
+
+tmp_dir = '/tmp/rumba'
+
+
+class Testbed(object):
+ """
+ Base class for every testbed plugin.
+ """
+ def __init__(self,
+ exp_name,
+ username,
+ password,
+ proj_name,
+ http_proxy=None,
+ system_logs=None):
+ """
+ :param exp_name: The experiment name.
+ :param username: The username.
+ :param password: The password.
+ :param proj_name: The project name.
+ :param http_proxy: HTTP proxy used by the testbed.
+ :param system_logs: Location of the system logs of
+ images of the testbed.
+ """
+ self.username = username
+ self.password = password
+ self.proj_name = proj_name
+ self.exp_name = exp_name
+ self.http_proxy = http_proxy
+ self.flags = {'no_vlan_offload': False}
+ self.executor = None
+ if system_logs is None:
+ self.system_logs = ['/var/log/syslog']
+ elif isinstance(system_logs, str):
+ self.system_logs = [system_logs]
+ else:
+ self.system_logs = system_logs
+
+ def swap_in(self, experiment):
+ """
+ Swaps experiment in on the testbed.
+
+ :param experiment: The experiment.
+ """
+ for node in experiment.nodes:
+ node.executor = self.executor
+
+ self._swap_in(experiment)
+
+ for dif in experiment.dif_ordering:
+ if isinstance(dif, topology.ShimEthDIF):
+ dif.link_quality.apply(dif)
+
+ @abc.abstractmethod
+ def _swap_in(self, experiment):
+ logger.info("_swap_in(): nothing to do")
+
+ def swap_out(self, experiment):
+ """
+ Swaps experiment out of the testbed.
+
+ :param experiment: The experiment.
+ """
+ self._swap_out(experiment)
+
+ @abc.abstractmethod
+ def _swap_out(self, experiment):
+ logger.info("swap_out(): nothing to do")
+
+
+class Experiment(object):
+ """
+ Base class for experiments.
+ """
+ __metaclass__ = abc.ABCMeta
+
+ def __init__(self, testbed,
+ nodes=None,
+ git_repo=None,
+ git_branch=None,
+ log_dir=None,
+ prototype_logs=None):
+ """
+ :param testbed: The testbed of the experiment.
+ :param nodes: The list of nodes in the experiment.
+ :param git_repo: The git repository of the prototype.
+ :param git_branch: The git branch of the repository.
+ :param log_dir: Where to log output of the experiment.
+ :param prototype_logs: Where the prototype logs its output.
+ """
+ if nodes is None:
+ nodes = list()
+ self.nodes = nodes
+ self.git_repo = git_repo
+ self.git_branch = git_branch
+ self.testbed = testbed
+ # the strategy employed for completing the enrollment phase in
+ # the different DIFs
+ self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual'
+ # the strategy employed for setting up the data transfer
+ # networks in the DIFs after enrollment
+ self.dt_strategy = 'full-mesh' # 'minimal', 'manual'
+ self.dif_ordering = []
+ self.enrollments = [] # a list of per-DIF lists of enrollments
+ self.dt_flows = [] # a list of per-DIF lists of data transfer flows
+ self.mgmt_flows = [] # a list of per-DIF lists of management flows
+
+ # Determine log directory
+ if log_dir is None:
+ # If it is None, use /tmp/rumba/{project}
+ # Wipe it and make it again
+ exp_name = self.testbed.exp_name.replace('/', '_') # Just in case
+ log_dir = os.path.join(tmp_dir, exp_name)
+ shutil.rmtree(log_dir, ignore_errors=True)
+ os.mkdir(log_dir)
+ self.log_dir = log_dir
+ if not os.path.isdir(self.log_dir):
+ raise Exception('Destination "%s" is not a directory. '
+ 'Cannot fetch logs.'
+ % self.log_dir)
+ self.prototype_logs = prototype_logs \
+ if prototype_logs is not None else []
+
+ # Generate missing information
+ self.generate()
+
+ def __repr__(self):
+ s = ""
+ for n in self.nodes:
+ s += "\n" + str(n)
+
+ return s
+
+ def add_node(self, node):
+ """
+ Adds a node to the experiment.
+
+ :param node: A node.
+ """
+ self.nodes.append(node)
+ self.generate()
+
+ def del_node(self, node):
+ """
+ Deletes a node from the experiment.
+
+ :param node: A node.
+ """
+ self.nodes.remove(node)
+ self.generate()
+
+ # Compute registration/enrollment order for DIFs
+ def compute_dif_ordering(self):
+ # Compute DIFs dependency graph, as both adjacency and incidence list.
+ difsdeps_adj = dict()
+ difsdeps_inc = dict()
+
+ for node in self.nodes:
+ for dif in node.difs:
+ if dif not in difsdeps_adj:
+ difsdeps_adj[dif] = set()
+
+ for upper in node.dif_registrations:
+ for lower in node.dif_registrations[upper]:
+ if upper not in difsdeps_inc:
+ difsdeps_inc[upper] = set()
+ if lower not in difsdeps_inc:
+ difsdeps_inc[lower] = set()
+ if upper not in difsdeps_adj:
+ difsdeps_adj[upper] = set()
+ if lower not in difsdeps_adj:
+ difsdeps_adj[lower] = set()
+ difsdeps_inc[upper].add(lower)
+ difsdeps_adj[lower].add(upper)
+
+ # Kahn's algorithm below only needs per-node count of
+ # incident edges, so we compute these counts from the
+ # incidence list and drop the latter.
+ difsdeps_inc_cnt = dict()
+ for dif in difsdeps_inc:
+ difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif])
+ del difsdeps_inc
+
+ # Init difsdeps_inc_cnt for those DIFs that do not
+ # act as lower IPCPs nor upper IPCPs for registration
+ # operations
+ for node in self.nodes:
+ for dif in node.difs:
+ if dif not in difsdeps_inc_cnt:
+ difsdeps_inc_cnt[dif] = 0
+
+ # Run Kahn's algorithm to compute topological
+ # ordering on the DIFs graph.
+ frontier = set()
+ self.dif_ordering = []
+ for dif in difsdeps_inc_cnt:
+ if difsdeps_inc_cnt[dif] == 0:
+ frontier.add(dif)
+
+ while len(frontier):
+ cur = frontier.pop()
+ self.dif_ordering.append(cur)
+ for nxt in difsdeps_adj[cur]:
+ difsdeps_inc_cnt[nxt] -= 1
+ if difsdeps_inc_cnt[nxt] == 0:
+ frontier.add(nxt)
+ difsdeps_adj[cur] = set()
+
+ circular_set = [dif for dif in difsdeps_inc_cnt
+ if difsdeps_inc_cnt[dif] != 0]
+ if len(circular_set):
+ raise Exception("Fatal error: The specified DIFs topology"
+ "has one or more"
+ "circular dependencies, involving the following"
+ " DIFs: %s" % circular_set)
+
+ logger.debug("DIF topological ordering: %s", self.dif_ordering)
+
+ # Compute all the enrollments, to be called after compute_dif_ordering()
+ def compute_enrollments(self):
+ dif_graphs = dict()
+ self.enrollments = []
+ self.mgmt_flows = []
+ self.dt_flows = []
+
+ for dif in self.dif_ordering:
+ neighsets = dict()
+ dif_graphs[dif] = dict()
+ first = None
+
+ # For each N-1-DIF supporting this DIF, compute the set of nodes
+ # that share such N-1-DIF. This set will be called the 'neighset' of
+ # the N-1-DIF for the current DIF.
+
+ for node in self.nodes:
+ if dif in node.dif_registrations:
+ dif_graphs[dif][node] = [] # init for later use
+ if first is None: # pick any node for later use
+ first = node
+ for lower_dif in node.dif_registrations[dif]:
+ if lower_dif not in neighsets:
+ neighsets[lower_dif] = []
+ neighsets[lower_dif].append(node)
+
+ # Build the graph, represented as adjacency list
+ for lower_dif in neighsets:
+ # Each neighset corresponds to a complete (sub)graph.
+ for node1 in neighsets[lower_dif]:
+ for node2 in neighsets[lower_dif]:
+ if node1 != node2:
+ dif_graphs[dif][node1].append((node2, lower_dif))
+
+ self.enrollments.append([])
+ self.dt_flows.append([])
+ self.mgmt_flows.append([])
+
+ if first is None:
+ # This is a shim DIF, nothing to do
+ continue
+
+ er = []
+ for node in dif_graphs[dif]:
+ for edge in dif_graphs[dif][node]:
+ er.append("%s --[%s]--> %s" % (node.name,
+ edge[1].name,
+ edge[0].name))
+ logger.debug("DIF graph for %s: %s", dif, ', '.join(er))
+
+ # To generate the list of mgmt flows, minimal enrollments
+ # and minimal dt flows, we simulate it, using
+ # breadth-first traversal.
+ enrolled = {first}
+ frontier = {first}
+ while len(frontier):
+ cur = frontier.pop()
+ for edge in dif_graphs[dif][cur]:
+ if edge[0] not in enrolled:
+ enrolled.add(edge[0])
+ enrollee = edge[0].get_ipcp_by_dif(dif)
+ assert(enrollee is not None)
+ enroller = cur.get_ipcp_by_dif(dif)
+ assert(enroller is not None)
+ if self.enrollment_strategy == 'minimal':
+ self.enrollments[-1].append({'dif': dif,
+ 'enrollee': enrollee,
+ 'enroller': enroller,
+ 'lower_dif': edge[1]})
+ self.mgmt_flows[-1].append({'src': enrollee,
+ 'dst': enroller})
+ if self.dt_strategy == 'minimal':
+ self.dt_flows[-1].append({'src': enrollee,
+ 'dst': enroller})
+ frontier.add(edge[0])
+ if len(dif.members) != len(enrolled):
+ raise Exception("Disconnected DIF found: %s" % (dif,))
+
+ # In case of a full mesh enrollment or dt flows
+ for cur in dif_graphs[dif]:
+ for edge in dif_graphs[dif][cur]:
+ if cur.name < edge[0].name:
+ enrollee = cur.get_ipcp_by_dif(dif)
+ assert(enrollee is not None)
+ enroller = edge[0].get_ipcp_by_dif(dif)
+ assert(enroller is not None)
+ if self.enrollment_strategy == 'full-mesh':
+ self.enrollments[-1].append({'dif': dif,
+ 'enrollee': enrollee,
+ 'enroller': enroller,
+ 'lower_dif': edge[1]})
+ if self.dt_strategy == 'full-mesh':
+ self.dt_flows[-1].append({'src': enrollee,
+ 'dst': enroller})
+
+ if not (self.dt_strategy == 'minimal'
+ or self.dt_strategy == 'full-mesh') \
+ or not (self.enrollment_strategy == 'full-mesh'
+ or self.enrollment_strategy == 'minimal'):
+ # This is a bug
+ assert False
+
+ log_string = "Enrollments:\n"
+ for el in self.enrollments:
+ for e in el:
+ log_string += (" [%s] %s --> %s through N-1-DIF %s\n"
+ % (e['dif'],
+ e['enrollee'].name,
+ e['enroller'].name,
+ e['lower_dif']))
+ logger.debug(log_string)
+
+ log_string = "Mgmt flows:\n"
+ for el in self.mgmt_flows:
+ for e in el:
+ log_string += (" %s --> %s \n"
+ % (e['src'].name,
+ e['dst'].name))
+ logger.debug(log_string)
+
+ log_string = "Dt flows:\n"
+ for el in self.dt_flows:
+ for e in el:
+ log_string += (" %s --> %s \n"
+ % (e['src'].name,
+ e['dst'].name))
+ logger.debug(log_string)
+
+ def compute_ipcps(self):
+ # For each node, compute the required IPCP instances, and associated
+ # registrations
+ for node in self.nodes:
+ node.ipcps = []
+ # We want also the node.ipcps list to be generated in
+ # topological ordering
+ for dif in self.dif_ordering:
+ if dif not in node.difs:
+ continue
+
+ # Create an instance of the required IPCP class
+ ipcp = dif.get_ipcp_class()(
+ name='%s.%s' % (dif.name, node.name),
+ node=node, dif=dif)
+
+ if dif in node.dif_registrations:
+ for lower in node.dif_registrations[dif]:
+ ipcp.registrations.append(lower)
+
+ node.ipcps.append(ipcp)
+ dif.ipcps.append(ipcp)
+
+ def compute_bootstrappers(self):
+ for node in self.nodes:
+ for ipcp in node.ipcps:
+ ipcp.dif_bootstrapper = True
+ for el in self.enrollments:
+ for e in el:
+ if e['dif'] != ipcp.dif:
+ # Skip this DIF
+ break
+ if e['enrollee'] == ipcp:
+ ipcp.dif_bootstrapper = False
+ # Exit the loops
+ break
+ if not ipcp.dif_bootstrapper:
+ break
+
+ def dump_ssh_info(self):
+ f = open(os.path.join(tmp_dir, 'ssh_info'), 'w')
+ for node in self.nodes:
+ f.write("%s;%s;%s;%s;%s\n" % (node.name,
+ self.testbed.username,
+ node.ssh_config.hostname,
+ node.ssh_config.port,
+ node.ssh_config.proxy_server))
+ f.close()
+
+ # Examine the nodes and DIFs, compute the registration and enrollment
+ # order, the list of IPCPs to create, registrations, ...
+ def generate(self):
+ start = time.time()
+ self.compute_dif_ordering()
+ self.compute_ipcps()
+ self.compute_enrollments()
+ self.compute_bootstrappers()
+ for node in self.nodes:
+ logger.info("IPCPs for node %s: %s", node.name, node.ipcps)
+ end = time.time()
+ logger.info("Layer ordering computation took %.2f seconds", end - start)
+
+ def install_prototype(self):
+ """
+ Installs the prototype on the nodes.
+ """
+ start = time.time()
+ self._install_prototype()
+ end = time.time()
+ logger.info("Install took %.2f seconds", end - start)
+
+ def set_startup_command(self, command):
+ for node in self.nodes:
+ node.startup_command = command
+
+ def bootstrap_prototype(self):
+ """
+ Bootstraps the prototype on the nodes.
+ """
+ start = time.time()
+ self._bootstrap_prototype()
+ end = time.time()
+ logger.info("Bootstrap took %.2f seconds", end - start)
+
+ @abc.abstractmethod
+ def _install_prototype(self):
+ raise Exception('install_prototype() method not implemented')
+
+ @abc.abstractmethod
+ def _bootstrap_prototype(self):
+ raise Exception('bootstrap_prototype() method not implemented')
+
+ @abc.abstractmethod
+ def prototype_name(self):
+ raise Exception('prototype_name() method not implemented')
+
+ @abc.abstractmethod
+ def _terminate_prototype(self):
+ raise Exception('terminate_prototype() method not implemented')
+
+ def swap_in(self):
+ """
+ Swap the experiment in on the testbed.
+ """
+ start = time.time()
+ self.testbed.swap_in(self)
+ self.dump_ssh_info()
+ end = time.time()
+ logger.info("Swap-in took %.2f seconds", end - start)
+
+ def swap_out(self):
+ """
+ Swap the experiment out of the testbed.
+ """
+ start = time.time()
+ # Terminate prototype gracefully
+ self._terminate_prototype()
+ for node in self.nodes:
+ if node.ssh_config.client is not None:
+ node.ssh_config.client.close()
+ if node.ssh_config.proxy_client is not None:
+ node.ssh_config.proxy_client.close()
+ # Undo the testbed (testbed-specific)
+ self.testbed.swap_out(self)
+ end = time.time()
+ logger.info("Swap-out took %.2f seconds", end - start)
+
+
+class Executor:
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def execute_command(self, node, command, as_root=False, time_out=3):
+ # Execute command on a node
+ return
+
+ def execute_commands(self, node, commands, as_root=False, time_out=3):
+ for command in commands:
+ self.execute_command(node, command, as_root, time_out)
+
+ @abc.abstractmethod
+ def copy_file(self, node, path, destination):
+ return
+
+ def copy_files(self, node, paths, destination):
+ for path in paths:
+ self.copy_file(node, path, destination)
+
+ @abc.abstractmethod
+ def fetch_file(self, node, path, destination, sudo=False):
+ return
+
+ def fetch_files(self, node, paths, destination, sudo=False):
+ for path in paths:
+ self.fetch_file(node, path, destination, sudo)
diff --git a/rumba/elements/topology.py b/rumba/elements/topology.py
new file mode 100644
index 0000000..24dcfa2
--- /dev/null
+++ b/rumba/elements/topology.py
@@ -0,0 +1,869 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Dimitri Staessens <dimitri.staessens@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
+
+
+from enum import Enum
+
+import rumba.log as log
+
+logger = log.get_logger(__name__)
+
+
+class DIF(object):
+ """
+ Base class for DIFs.
+ """
+ def __init__(self, name, members=None):
+ """
+ :param name: Name of the DIF.
+ :param members: List of nodes that are members of the DIF.
+ """
+ self.name = name
+ if members is None:
+ members = list()
+ self.members = members
+ self.ipcps = list()
+
+ def __repr__(self):
+ s = "DIF %s" % self.name
+ return s
+
+ def __hash__(self):
+ return hash(self.name)
+
+ def __eq__(self, other):
+ return other is not None and self.name == other.name
+
+ def __neq__(self, other):
+ return not self == other
+
+ def add_member(self, node):
+ self.members.append(node)
+
+ def del_member(self, node):
+ self.members.remove(node)
+
+ def get_ipcp_class(self):
+ return IPCP
+
+
+# Shim over UDP
+#
+class ShimUDPDIF(DIF):
+ """
+ Shim over UDP.
+ """
+ def __init__(self, name, members=None):
+ """
+ :param name: Name of the DIF.
+ :param members: List of members of the DIF.
+ """
+ DIF.__init__(self, name, members)
+
+ def get_ipcp_class(self):
+ return ShimUDPIPCP
+
+
+# Shim over Ethernet
+#
+# @link_speed [int] Speed of the Ethernet network, in Mbps
+#
+class ShimEthDIF(DIF):
+ """
+ Shim over Ethernet.
+ """
+ def get_e_id(self):
+ return "ShimEthDIF." + self.name
+
+ def __init__(self, name, members=None, link_quality=None):
+ """
+ :param name: Name of the DIF.
+ :param members: List of members of the DIF.
+ :param link_quality: Quality of the link.
+ """
+ DIF.__init__(self, name, members)
+ self._link_quality = link_quality if link_quality is not None else LinkQuality()
+
+ def get_ipcp_class(self):
+ return ShimEthIPCP
+
+ def add_member(self, node):
+ super(ShimEthDIF, self).add_member(node)
+ if len(self.members) > 2:
+ raise Exception("More than 2 members in %s!" % self.name)
+
+ @property
+ def link_quality(self):
+ return self._link_quality
+
+ @link_quality.setter
+ def link_quality(self, _link_quality):
+ if not _link_quality:
+ raise ValueError("Cannot set link_quality to None, use del "
+ "link_quality to reset")
+
+ self._link_quality = _link_quality
+
+ _link_quality.apply(self)
+
+ @link_quality.deleter
+ def link_quality(self):
+ self._link_quality.deactivate(self)
+
+ def set_delay(self, delay=0,
+ jitter=None,
+ correlation=None,
+ distribution=None):
+ """
+ Set the delay parameters of the underlying link.
+ Parameters as in :py:class:`.Delay`
+
+ :param delay: average delay in ms
+ :type delay: :py:class:`int`
+ :param jitter: jitter in ms
+ :type jitter: :py:class:`int`
+ :param correlation: correlation in %
+ :type correlation: :py:class:`int`
+ :param distribution: delay distribution, defaults to a Normal
+ distribution
+ :type distribution: :py:class:`.Distribution`
+ """
+ new_delay = Delay(delay, jitter, correlation, distribution)
+ new_quality = LinkQuality.clone(self.link_quality, delay=new_delay)
+ self.link_quality = new_quality
+
+ def set_loss(self,
+ loss=0,
+ correlation=None):
+ """
+ Set the loss parameter of the underlying link.
+ Parameters as in :py:class:`.Loss`
+
+ :param loss: loss in percentage
+ :type loss: :py:class:`int` or :py:class:`float`
+ :param correlation: correlation in percentage
+ :type correlation: :py:class:`int` or :py:class:`float`
+ """
+ new_loss = Loss(loss, correlation)
+ new_quality = LinkQuality.clone(self.link_quality, loss=new_loss)
+ self.link_quality = new_quality
+
+ def set_rate(self, rate=None):
+ """
+ Set the rate parameter of the underlying link.
+
+ :param rate: The desired rate in mbps
+ :type rate: :py:class:`int`
+ """
+ new_quality = LinkQuality.clone(self.link_quality, rate=rate)
+ self.link_quality = new_quality
+
+ def set_quality(self, delay, loss, rate):
+ """
+ Configure the basic quality parameters of the
+ underlying link.
+
+ :param delay: the link delay, in ms
+ :type delay: :py:class:`int`
+ :param loss: the link loss, as a percentage
+ :type loss: :py:class:`float` or :py:class:`int`
+ :param rate: the link rate in mbps
+ :type rate: :py:class:`int`
+ """
+ new_quality = LinkQuality(delay, loss, rate)
+ self.link_quality = new_quality
+
+
+class NormalDIF(DIF):
+ """
+ Normal DIF.
+ """
+ def __init__(self, name, members=None, policy=None):
+ """
+ :param name: The name of the DIF.
+ :param members: The list of members.
+ :param policy: Policies of the normal DIF.
+ """
+ DIF.__init__(self, name, members)
+ if policy is None:
+ policy = Policy(self)
+ self.policy = policy
+
+ def add_policy(self, comp, pol, **params):
+ """
+ Adds a policy to the DIF.
+
+ :param comp: Component name.
+ :param pol: Policy name
+ :param params: Parameters of the policy.
+ """
+ self.policy.add_policy(comp, pol, **params)
+
+ def del_policy(self, comp=None, pol=None):
+ """
+ Deletes a policy from the DIF.
+
+ :param comp: Component name.
+ :param pol: Policy name
+ """
+ self.policy.del_policy(comp, pol)
+
+ def show(self):
+ """
+ :return: A string representing the policies in the DIF.
+ """
+ s = DIF.__repr__(self)
+ for comp, pol_dict in self.policy.get_policies().items():
+ for pol, params in pol_dict.items():
+ s += "\n Component %s has policy %s with params %s" \
+ % (comp, pol, params)
+ return s
+
+
+class Distribution(Enum):
+ """
+ An enum holding different statistical distributions.
+
+ **Values:**
+
+ `NORMAL = 1`
+
+ `PARETO = 2`
+
+ `PARETONORMAL = 3`
+ """
+ NORMAL = 1
+ PARETO = 2
+ PARETONORMAL = 3
+
+
+class Delay(object):
+ """
+ A class representing delay of a link.
+ """
+ def __init__(self, delay=0, jitter=None, correlation=None,
+ distribution=None):
+ """
+ Configure link delay.
+
+ :param delay: average delay in ms
+ :type delay: :py:class:`int`
+ :param jitter: jitter in ms
+ :type jitter: :py:class:`int`
+ :param correlation: correlation in %
+ :type correlation: :py:class:`int`
+ :param distribution: delay distribution, defaults to a Normal
+ distribution
+ :type distribution: :py:class:`.Distribution`
+ """
+
+ if delay < 0:
+ raise ValueError("Delay needs to be at least 0")
+
+ if jitter and not jitter > 0:
+ raise ValueError("Jitter needs to be higher than 0")
+
+ if (not jitter) and correlation:
+ raise ValueError("Correlation requires a value for jitter")
+
+ if correlation and (correlation < 0 or correlation > 100):
+ raise ValueError("Correlation needs to be between 0 and 100")
+
+ self._delay = delay
+ self._jitter = jitter
+ self._correlation = correlation
+ self._distribution = distribution
+
+ @property
+ def delay(self):
+ return self._delay
+
+ @property
+ def jitter(self):
+ return self._jitter
+
+ @property
+ def correlation(self):
+ return self._correlation
+
+ @property
+ def distribution(self):
+ return self._distribution
+
+ def build_command(self):
+ opts = ["delay %ims" % self.delay]
+
+ if self.jitter:
+ opts.append("%ims" % self.jitter)
+
+ if self.correlation:
+ opts.append("%f%%" % self.correlation)
+
+ if self.distribution:
+ opts.append("distribution %s" % self.distribution.name.lower())
+
+ return " ".join(opts)
+
+
+class Loss(object):
+ """
+ A class representing loss on a link.
+ """
+ def __init__(self, loss, correlation=None):
+ """
+ Configure link loss.
+
+ :param loss: loss in percentage
+ :type loss: :py:class:`int` or :py:class:`float`
+ :param correlation: correlation in percentage
+ :type correlation: :py:class:`int` or :py:class:`float`
+ """
+ if loss and (loss < 0 or loss > 100):
+ raise ValueError("Loss needs to be between 0 and 100")
+
+ if correlation and (correlation < 0 or correlation > 100):
+ raise ValueError("Correlation needs to be between 0 and 100")
+
+ self._loss = loss
+ self._correlation = correlation
+
+ @property
+ def loss(self):
+ return self._loss
+
+ @property
+ def correlation(self):
+ return self._correlation
+
+ def build_command(self):
+ opts = ["loss %f%%" % self.loss]
+
+ if self.correlation:
+ opts.append("%f%%" % self.correlation)
+
+ return " ".join(opts)
+
+
+class LinkQuality(object):
+ """
+ A class representing the link quality.
+ """
+ _active = set()
+
+ @classmethod
+ def clone(cls, old_quality, delay=None, loss=None, rate=None):
+ """
+ Clone old_quality, updating it with the provided parameters
+ if present.
+
+ :param old_quality: A :py:class:`.LinkQuality` instance to
+ use as a base
+ :type old_quality: :py:class:`.LinkQuality`
+ :param delay: Delay object holding delay configuration
+ or number corresponding to delay in ms
+ :type delay: :py:class:`.Delay` or :py:class:`int`
+ :param loss: Loss object holding delay configuration or
+ number corresponding to loss percentage
+ :type loss: :py:class:`.Loss` or :py:class:`float`
+ :param rate: The rate of the link in mbit
+ :type rate: :py:class:`int`
+ :return: a new :py:class:`.LinkQuality` instance.
+ :rtype: :py:class:`.LinkQuality`
+ """
+ if delay is None:
+ delay = old_quality.delay
+ if loss is None:
+ loss = old_quality.loss
+ if rate is None:
+ rate = old_quality.rate
+ return LinkQuality(delay, loss, rate)
+
+ def __init__(self, delay=None, loss=None, rate=None):
+ """
+ Link quality configuration.
+
+ :param delay: Delay object holding delay configuration
+ or number corresponding to delay in ms
+ :type delay: :py:class:`.Delay` or :py:class:`int`
+ :param loss: Loss object holding delay configuration or
+ number corresponding to loss percentage
+ :type loss: :py:class:`.Loss` or :py:class:`float`
+ :param rate: The rate of the link in mbit
+ :type rate: :py:class:`int`
+ """
+
+ if rate and not rate > 0:
+ raise ValueError("Rate needs to be higher than 0")
+
+ if isinstance(delay, int):
+ delay = Delay(delay)
+ if isinstance(loss, int) or isinstance(loss, float):
+ loss = Loss(loss)
+ self._delay = delay
+ self._loss = loss
+ self._rate = rate
+
+ @property
+ def delay(self):
+ return self._delay
+
+ @property
+ def loss(self):
+ return self._loss
+
+ @property
+ def rate(self):
+ return self._rate
+
+ def build_command(self, ipcp):
+ cmd = []
+
+ if ipcp in LinkQuality._active:
+ cmd.append("tc qdisc change dev %s root netem" % ipcp.ifname)
+ else:
+ cmd.append("tc qdisc add dev %s root netem" % ipcp.ifname)
+
+ if self.delay:
+ cmd.append(self.delay.build_command())
+
+ if self.loss:
+ cmd.append(self.loss.build_command())
+
+ if self.rate:
+ cmd.append("rate %imbit" % self.rate)
+
+ return " ".join(cmd)
+
+ def apply(self, shim):
+ if not (self.delay or self.loss or self.rate):
+ self.deactivate(shim)
+ else:
+ for ipcp in shim.ipcps:
+ if not ipcp.ifname:
+ logger.error("Could not apply LinkQuality to IPCP because "
+ "the interface name is None")
+ continue
+
+ ipcp.node.execute_command(self.build_command(ipcp),
+ as_root=True)
+ LinkQuality._active.add(ipcp)
+
+ def deactivate(self, shim):
+ for ipcp in shim.ipcps:
+ if ipcp not in LinkQuality._active:
+ continue
+ if not ipcp.ifname:
+ logger.error("Could not remove LinkQuality from IPCP because "
+ "the interface name is None")
+ continue
+
+ ipcp.node.execute_command("tc qdisc del dev %s root "
+ "netem" % ipcp.ifname, as_root=True)
+ LinkQuality._active.remove(ipcp)
+
+
+class SSHConfig(object):
+ def __init__(self, hostname, port=22, proxy_server=None):
+ self.username = None
+ self.password = None
+ self.hostname = hostname
+ self.port = port
+ self.proxy_server = proxy_server
+ self.client = None
+ self.proxy_client = None
+ self.http_proxy = None
+
+ def set_username(self, username):
+ self.username = username
+
+ def set_password(self, password):
+ self.password = password
+
+ def set_http_proxy(self, proxy):
+ self.http_proxy = proxy
+
+
+class Node(object):
+ """
+ A node in the experiment.
+ """
+ def get_e_id(self):
+ return "Node." + self.name
+
+ def __init__(self, name, difs=None, dif_registrations=None,
+ policies=None, machine_type=None):
+ """
+ :param name: Name of the node.
+ :param difs: A list of DIFs the node is in.
+ :param dif_registrations: How the DIFs are stacked.
+ :param policies: Policies of a DIF specific to the node.
+ :param machine_type: Type of machine to use, physical or virtual.
+ """
+ self.name = name
+ if difs is None:
+ difs = list()
+ self.difs = difs
+ for dif in self.difs:
+ dif.add_member(self)
+ if dif_registrations is None:
+ dif_registrations = dict()
+ self.dif_registrations = dif_registrations
+ self.machine_type = machine_type
+ self.ssh_config = SSHConfig(name)
+ self.ipcps = []
+ self.policies = dict()
+ self.has_tcpdump = False
+ if policies is None:
+ policies = dict()
+ for dif in self.difs:
+ if hasattr(dif, 'policy'): # check if the dif supports policies
+ self.policies[dif] = policies.get(dif, Policy(dif, self))
+
+ self.executor = None # will be set by testbed on swap_in
+ self.startup_command = None # will be set by prototype
+
+ self._validate()
+
+ def get_ipcp_by_dif(self, dif):
+ """
+ :param dif: The DIF to get the IPCP of.
+ :return: The IPCP of the node that is in the DIF.
+ """
+ for ipcp in self.ipcps:
+ if ipcp.dif == dif:
+ return ipcp
+
+ def _undeclared_dif(self, dif):
+ if dif not in self.difs:
+ raise Exception("Invalid registration: node %s is not declared "
+ "to be part of DIF %s" % (self.name, dif.name))
+
+ def _validate(self):
+ # Check that DIFs referenced in self.dif_registrations
+ # are part of self.difs
+ for upper in self.dif_registrations:
+ self._undeclared_dif(upper)
+ for lower in self.dif_registrations[upper]:
+ self._undeclared_dif(lower)
+
+ def __repr__(self):
+ s = "Node " + self.name + ":\n"
+
+ s += " DIFs: [ "
+ s += " ".join([d.name for d in self.difs])
+ s += " ]\n"
+
+ s += " DIF registrations: [ "
+ rl = []
+ for upper in self.dif_registrations:
+ difs = self.dif_registrations[upper]
+ x = "%s => [" % upper.name
+ x += " ".join([lower.name for lower in difs])
+ x += "]"
+ rl.append(x)
+ s += ", ".join(rl)
+ s += " ]\n"
+
+ return s
+
+ def __hash__(self):
+ return hash(self.name)
+
+ def __eq__(self, other):
+ return other is not None and self.name == other.name
+
+ def __neq__(self, other):
+ return not self == other
+
+ def add_dif(self, dif):
+ """
+ Adds a DIF to the list.
+
+ :param dif: Name of the DIF to add.
+ """
+ self.difs.append(dif)
+ dif.add_member(self)
+ if hasattr(dif, 'policy'):
+ self.policies[dif] = Policy(dif, self)
+ self._validate()
+
+ def del_dif(self, dif):
+ """
+ Adds a DIF to the list.
+
+ :param dif: Name of the DIF to add.
+ """
+ self.difs.remove(dif)
+ dif.del_member(self)
+ try:
+ del self.policies[dif]
+ except KeyError:
+ # It was not in there, so nothing to do
+ pass
+ self._validate()
+
+ def add_dif_registration(self, upper, lower):
+ """
+ Adds a DIF registration.
+
+ :param upper: Name of the DIF that is requesting IPC.
+ :param lower: Name of the DIF providing IPC.
+ """
+ self.dif_registrations[upper].append(lower)
+ self._validate()
+
+ def del_dif_registration(self, upper, lower):
+ """
+ Removes a DIF registration.
+
+ :param upper: Name of the DIF that is requesting IPC.
+ :param lower: Name of the DIF providing IPC.
+ """
+ self.dif_registrations[upper].remove(lower)
+ self._validate()
+
+ def add_policy(self, dif, component_name, policy_name, **parameters):
+ """
+ Adds a policy.
+
+ :param dif: The name of the DIF.
+ :param component_name: Name of the component.
+ :param policy_name: Name of the policy.
+ :param parameters: Parameters of the policy.
+ """
+ self.policies[dif].add_policy(component_name, policy_name, **parameters)
+
+ def del_policy(self, dif, component_name=None, policy_name=None):
+ """
+ Removes a policy.
+
+ :param dif: the dif to which the policy should be applied
+ :param component_name: Name of the component.
+ :param policy_name: Name of the policy.
+ """
+ self.policies[dif].del_policy(component_name, policy_name)
+
+ def get_policy(self, dif):
+ """
+ :param dif: The DIF to get the policy of.
+ :return: Returns the policy.
+ """
+ return self.policies[dif]
+
+ def execute_commands(self, commands, as_root=False, time_out=3,
+ use_proxy=False):
+ """
+ Execute a list of a commands on the node.
+
+ :param commands: A list of commands.
+ :param as_root: Execute as root?
+ :param time_out: Seconds before timing out.
+ :param use_proxy: Use a proxy to execute the commands?
+ """
+ return self.executor.execute_commands(self,
+ commands,
+ as_root,
+ time_out)
+
+ def execute_command(self, command, as_root=False, time_out=3,
+ use_proxy=False):
+ """
+ Execute a single command on a node.
+
+ :param command: A command.
+ :param as_root: Execute as root?
+ :param time_out: Seconds before timing out.
+ :param use_proxy: Use a proxy to execute the commands?
+ :return: The stdout of the command.
+ """
+ return self.executor.execute_command(self,
+ command,
+ as_root,
+ time_out)
+
+ def copy_file(self, path, destination):
+ """
+ Copy file to node.
+
+ :param path: Local location of the file.
+ :param destination: Destination location of the file.
+ """
+ self.executor.copy_file(self, path, destination)
+
+ def copy_files(self, paths, destination):
+ """
+ Copy files to node.
+
+ :param paths: Local location of the files.
+ :param destination: Destination location of the files.
+ """
+ self.executor.copy_files(self, paths, destination)
+
+ def fetch_file(self, path, destination, sudo=False):
+ """
+ Fetch file from the node.
+
+ :param path: Location of the files on the node.
+ :param destination: Destination location of the files.
+ :param sudo: The file is owned by root on the node?
+ """
+ self.executor.fetch_file(self, path, destination, sudo)
+
+ def fetch_files(self, paths, destination, sudo=False):
+ """
+ Fetch files from the node.
+
+ :param paths: Location of the files on the node.
+ :param destination: Destination location of the files.
+ :param sudo: The file is owned by root on the node?
+ """
+ self.executor.fetch_files(self, paths, destination, sudo)
+
+ def set_link_state(self, dif, state):
+ """
+ Change the state of a link on the node.
+
+ :param dif: The name of the shim Ethernet DIF.
+ :param state: Up or down.
+ """
+ ipcp = self.get_ipcp_by_dif(dif)
+ self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state,
+ as_root=True)
+
+
+class IPCP(object):
+ def __init__(self, name, node, dif):
+ self.name = name
+ self.node = node
+ self.dif = dif
+ self.registrations = []
+
+ # Is this IPCP the first in its DIF, so that it does not need
+ # to enroll to anyone ?
+ self.dif_bootstrapper = False
+
+ def __repr__(self):
+ return "{IPCP=%s,DIF=%s,N-1-DIFs=(%s)%s}" % \
+ (self.name, self.dif.name,
+ ' '.join([dif.name for dif in self.registrations]),
+ ',bootstrapper' if self.dif_bootstrapper else ''
+ )
+
+ def __hash__(self):
+ return hash((self.name, self.dif.name))
+
+ def __eq__(self, other):
+ return other is not None and self.name == other.name \
+ and self.dif == other.dif
+
+ def __neq__(self, other):
+ return not self == other
+
+
+class ShimEthIPCP(IPCP):
+ def __init__(self, name, node, dif, ifname=None):
+ IPCP.__init__(self, name, node, dif)
+ self.ifname = ifname
+
+
+class ShimUDPIPCP(IPCP):
+ def __init__(self, name, node, dif):
+ IPCP.__init__(self, name, node, dif)
+ # TODO: add IP and port
+
+
+class Policy(object):
+ def __init__(self, dif, node=None, policies=None):
+ self.dif = dif # type: NormalDIF
+ self.node = node
+ if policies is None:
+ self._dict = dict()
+ else:
+ self._dict = policies
+
+ def add_policy(self, component_name, policy_name, **parameters):
+ self._dict.setdefault(component_name, dict())[policy_name] = parameters
+
+ #
+ # Fetches effective policy info
+ #
+ def get_policies(self, component_name=None, policy_name=None):
+ policy = self._superimpose()
+ if component_name is None:
+ return policy._dict
+ elif policy_name is None:
+ return policy._dict[component_name]
+ else:
+ return policy._dict[component_name][policy_name]
+
+ def del_policy(self, component_name=None, policy_name=None):
+ if component_name is None:
+ self._dict = dict()
+ elif policy_name is None:
+ del self._dict[component_name]
+ else:
+ del self._dict[component_name][policy_name]
+
+ #
+ # Merges this policy into that of its dif, obtaining
+ # the effective policy acting on self.node.
+ #
+ def _superimpose(self):
+ if self.node is None:
+ return self
+ other = self.dif.policy
+ base = dict(other._dict)
+ base.update(self._dict)
+ return Policy(self.dif, self.node, base)
+
+ def __eq__(self, other):
+ if not isinstance(other, Policy):
+ return False
+ else:
+ return other.dif == self.dif \
+ and other.node == self.node \
+ and other._dict == self._dict
+
+ def __str__(self):
+ node_str = (" Node: " + self.node) if self.node is not None else ""
+ return "Policy[Dif: %(dif)s,%(node_str)s Dict: %(dict)s]" \
+ % {"dif": self.dif, "node_str": node_str, "dict": self._dict}
+
+ def __repr__(self):
+ node_str = (" Node: " + self.node) if self.node is not None else ""
+ s = "Policy[ Dif: %(dif)s,%(node_str)s" \
+ % {"dif": self.dif, "node_str": node_str}
+ comps = []
+ for component in self._dict:
+ for policy in self._dict[component]:
+ comps.append("\n Component %s has policy %s with params %s"
+ % (component,
+ policy,
+ self._dict[component][policy]))
+ s += ",".join(comps)
+ s += "\n]\n"
+ return s
diff --git a/rumba/model.py b/rumba/model.py
index a6925b3..4c151e9 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -25,19 +25,65 @@
# Foundation, Inc., http://www.fsf.org/about/contact/.
#
-import abc
+
import os
import stat
-import time
-import shutil
-from enum import Enum
-import rumba.log as log
+from rumba.elements.topology import (
+ DIF,
+ ShimEthDIF,
+ ShimUDPDIF,
+ NormalDIF,
+
+ IPCP,
+ ShimEthIPCP,
+ ShimUDPIPCP,
+
+ Node,
+ SSHConfig,
+
+ LinkQuality,
+ Delay,
+ Loss
+)
+
+from rumba.elements.experimentation import (
+ Experiment,
+ Testbed,
+ Executor,
+ tmp_dir
+)
+
+
+__all__ = [
+ # Topology
+ "DIF",
+ "ShimEthDIF",
+ "ShimUDPDIF",
+ "NormalDIF",
+
+ "IPCP",
+ "ShimEthIPCP",
+ "ShimUDPIPCP",
-logger = log.get_logger(__name__)
+ "Node",
+ "SSHConfig",
+
+ "LinkQuality",
+ "Delay",
+ "Loss",
+
+ # Experimentation
+ "Experiment",
+ "Testbed",
+ "Executor",
+ "tmp_dir",
+
+ # Other
+ "cache_dir"
+]
-tmp_dir = '/tmp/rumba'
try:
os.mkdir(tmp_dir)
os.chmod(tmp_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
@@ -57,1337 +103,3 @@ try:
except OSError:
# Already there, nothing to do
pass
-
-
-class Testbed(object):
- """
- Base class for every testbed plugin.
- """
- def __init__(self,
- exp_name,
- username,
- password,
- proj_name,
- http_proxy=None,
- system_logs=None):
- """
- :param exp_name: The experiment name.
- :param username: The username.
- :param password: The password.
- :param proj_name: The project name.
- :param http_proxy: HTTP proxy used by the testbed.
- :param system_logs: Location of the system logs of
- images of the testbed.
- """
- self.username = username
- self.password = password
- self.proj_name = proj_name
- self.exp_name = exp_name
- self.http_proxy = http_proxy
- self.flags = {'no_vlan_offload': False}
- if system_logs is None:
- self.system_logs = ['/var/log/syslog']
- elif isinstance(system_logs, str):
- self.system_logs = [system_logs]
- else:
- self.system_logs = system_logs
-
- def swap_in(self, experiment):
- """
- Swaps experiment in on the testbed.
-
- :param experiment: The experiment.
- """
- for node in experiment.nodes:
- node.executor = self.executor
-
- self._swap_in(experiment)
-
- for dif in experiment.dif_ordering:
- if isinstance(dif, ShimEthDIF):
- dif.link_quality.apply(dif)
-
- @abc.abstractmethod
- def _swap_in(self, experiment):
- logger.info("_swap_in(): nothing to do")
-
- def swap_out(self, experiment):
- """
- Swaps experiment out of the testbed.
-
- :param experiment: The experiment.
- """
- self._swap_out(experiment)
-
- @abc.abstractmethod
- def _swap_out(self, experiment):
- logger.info("swap_out(): nothing to do")
-
-
-class DIF(object):
- """
- Base class for DIFs.
- """
- def __init__(self, name, members=None):
- """
- :param name: Name of the DIF.
- :param members: List of nodes that are members of the DIF.
- """
- self.name = name
- if members is None:
- members = list()
- self.members = members
- self.ipcps = list()
-
- def __repr__(self):
- s = "DIF %s" % self.name
- return s
-
- def __hash__(self):
- return hash(self.name)
-
- def __eq__(self, other):
- return other is not None and self.name == other.name
-
- def __neq__(self, other):
- return not self == other
-
- def add_member(self, node):
- self.members.append(node)
-
- def del_member(self, node):
- self.members.remove(node)
-
- def get_ipcp_class(self):
- return IPCP
-
-
-# Shim over UDP
-#
-class ShimUDPDIF(DIF):
- """
- Shim over UDP.
- """
- def __init__(self, name, members=None):
- """
- :param name: Name of the DIF.
- :param members: List of members of the DIF.
- """
- DIF.__init__(self, name, members)
-
- def get_ipcp_class(self):
- return ShimUDPIPCP
-
-
-# Shim over Ethernet
-#
-# @link_speed [int] Speed of the Ethernet network, in Mbps
-#
-class ShimEthDIF(DIF):
- """
- Shim over Ethernet.
- """
- def get_e_id(self):
- return "ShimEthDIF." + self.name
-
- def __init__(self, name, members=None, link_quality=None):
- """
- :param name: Name of the DIF.
- :param members: List of members of the DIF.
- :param link_quality: Quality of the link.
- """
- DIF.__init__(self, name, members)
- self._link_quality = link_quality if link_quality is not None else LinkQuality()
-
- def get_ipcp_class(self):
- return ShimEthIPCP
-
- def add_member(self, node):
- super(ShimEthDIF, self).add_member(node)
- if len(self.members) > 2:
- raise Exception("More than 2 members in %s!" % self.name)
-
- @property
- def link_quality(self):
- return self._link_quality
-
- @link_quality.setter
- def link_quality(self, _link_quality):
- if not _link_quality:
- raise ValueError("Cannot set link_quality to None, use del "
- "link_quality to reset")
-
- self._link_quality = _link_quality
-
- _link_quality.apply(self)
-
- @link_quality.deleter
- def link_quality(self):
- self._link_quality.deactivate(self)
-
- def set_delay(self, delay=0,
- jitter=None,
- correlation=None,
- distribution=None):
- """
- Set the delay parameters of the underlying link.
- Parameters as in :py:class:`.Delay`
-
- :param delay: average delay in ms
- :type delay: :py:class:`int`
- :param jitter: jitter in ms
- :type jitter: :py:class:`int`
- :param correlation: correlation in %
- :type correlation: :py:class:`int`
- :param distribution: delay distribution, defaults to a Normal
- distribution
- :type distribution: :py:class:`.Distribution`
- """
- new_delay = Delay(delay, jitter, correlation, distribution)
- new_quality = LinkQuality.clone(self.link_quality, delay=new_delay)
- self.link_quality = new_quality
-
- def set_loss(self,
- loss=0,
- correlation=None):
- """
- Set the loss parameter of the underlying link.
- Parameters as in :py:class:`.Loss`
-
- :param loss: loss in percentage
- :type loss: :py:class:`int` or :py:class:`float`
- :param correlation: correlation in percentage
- :type correlation: :py:class:`int` or :py:class:`float`
- """
- new_loss = Loss(loss, correlation)
- new_quality = LinkQuality.clone(self.link_quality, loss=new_loss)
- self.link_quality = new_quality
-
- def set_rate(self, rate=None):
- """
- Set the rate parameter of the underlying link.
-
- :param rate: The desired rate in mbps
- :type rate: :py:class:`int`
- """
- new_quality = LinkQuality.clone(self.link_quality, rate=rate)
- self.link_quality = new_quality
-
- def set_quality(self, delay, loss, rate):
- """
- Configure the basic quality parameters of the
- underlying link.
-
- :param delay: the link delay, in ms
- :type delay: :py:class:`int`
- :param loss: the link loss, as a percentage
- :type loss: :py:class:`float` or :py:class:`int`
- :param rate: the link rate in mbps
- :type rate: :py:class:`int`
- """
- new_quality = LinkQuality(delay, loss, rate)
- self.link_quality = new_quality
-
-
-class NormalDIF(DIF):
- """
- Normal DIF.
- """
- def __init__(self, name, members=None, policy=None):
- """
- :param name: The name of the DIF.
- :param members: The list of members.
- :param policy: Policies of the normal DIF.
- """
- DIF.__init__(self, name, members)
- if policy is None:
- policy = Policy(self)
- self.policy = policy
-
- def add_policy(self, comp, pol, **params):
- """
- Adds a policy to the DIF.
-
- :param comp: Component name.
- :param pol: Policy name
- :param params: Parameters of the policy.
- """
- self.policy.add_policy(comp, pol, **params)
-
- def del_policy(self, comp=None, pol=None):
- """
- Deletes a policy from the DIF.
-
- :param comp: Component name.
- :param pol: Policy name
- """
- self.policy.del_policy(comp, pol)
-
- def show(self):
- """
- :return: A string representing the policies in the DIF.
- """
- s = DIF.__repr__(self)
- for comp, pol_dict in self.policy.get_policies().items():
- for pol, params in pol_dict.items():
- s += "\n Component %s has policy %s with params %s" \
- % (comp, pol, params)
- return s
-
-
-class Distribution(Enum):
- """
- An enum holding different statistical distributions.
-
- **Values:**
-
- `NORMAL = 1`
-
- `PARETO = 2`
-
- `PARETONORMAL = 3`
- """
- NORMAL = 1
- PARETO = 2
- PARETONORMAL = 3
-
-
-class Delay(object):
- """
- A class representing delay of a link.
- """
- def __init__(self, delay=0, jitter=None, correlation=None,
- distribution=None):
- """
- Configure link delay.
-
- :param delay: average delay in ms
- :type delay: :py:class:`int`
- :param jitter: jitter in ms
- :type jitter: :py:class:`int`
- :param correlation: correlation in %
- :type correlation: :py:class:`int`
- :param distribution: delay distribution, defaults to a Normal
- distribution
- :type distribution: :py:class:`.Distribution`
- """
-
- if delay < 0:
- raise ValueError("Delay needs to be at least 0")
-
- if jitter and not jitter > 0:
- raise ValueError("Jitter needs to be higher than 0")
-
- if (not jitter) and correlation:
- raise ValueError("Correlation requires a value for jitter")
-
- if correlation and (correlation < 0 or correlation > 100):
- raise ValueError("Correlation needs to be between 0 and 100")
-
- self._delay = delay
- self._jitter = jitter
- self._correlation = correlation
- self._distribution = distribution
-
- @property
- def delay(self):
- return self._delay
-
- @property
- def jitter(self):
- return self._jitter
-
- @property
- def correlation(self):
- return self._correlation
-
- @property
- def distribution(self):
- return self._distribution
-
- def build_command(self):
- opts = ["delay %ims" % self.delay]
-
- if self.jitter:
- opts.append("%ims" % self.jitter)
-
- if self.correlation:
- opts.append("%f%%" % self.correlation)
-
- if self.distribution:
- opts.append("distribution %s" % self.distribution.name.lower())
-
- return " ".join(opts)
-
-
-class Loss(object):
- """
- A class representing loss on a link.
- """
- def __init__(self, loss, correlation=None):
- """
- Configure link loss.
-
- :param loss: loss in percentage
- :type loss: :py:class:`int` or :py:class:`float`
- :param correlation: correlation in percentage
- :type correlation: :py:class:`int` or :py:class:`float`
- """
- if loss and (loss < 0 or loss > 100):
- raise ValueError("Loss needs to be between 0 and 100")
-
- if correlation and (correlation < 0 or correlation > 100):
- raise ValueError("Correlation needs to be between 0 and 100")
-
- self._loss = loss
- self._correlation = correlation
-
- @property
- def loss(self):
- return self._loss
-
- @property
- def correlation(self):
- return self._correlation
-
- def build_command(self):
- opts = ["loss %f%%" % self.loss]
-
- if self.correlation:
- opts.append("%f%%" % self.correlation)
-
- return " ".join(opts)
-
-
-class LinkQuality(object):
- """
- A class representing the link quality.
- """
- _active = set()
-
- @classmethod
- def clone(cls, old_quality, delay=None, loss=None, rate=None):
- """
- Clone old_quality, updating it with the provided parameters
- if present.
-
- :param old_quality: A :py:class:`.LinkQuality` instance to
- use as a base
- :type old_quality: :py:class:`.LinkQuality`
- :param delay: Delay object holding delay configuration
- or number corresponding to delay in ms
- :type delay: :py:class:`.Delay` or :py:class:`int`
- :param loss: Loss object holding delay configuration or
- number corresponding to loss percentage
- :type loss: :py:class:`.Loss` or :py:class:`float`
- :param rate: The rate of the link in mbit
- :type rate: :py:class:`int`
- :return: a new :py:class:`.LinkQuality` instance.
- :rtype: :py:class:`.LinkQuality`
- """
- if delay is None:
- delay = old_quality.delay
- if loss is None:
- loss = old_quality.loss
- if rate is None:
- rate = old_quality.rate
- return LinkQuality(delay, loss, rate)
-
- def __init__(self, delay=None, loss=None, rate=None):
- """
- Link quality configuration.
-
- :param delay: Delay object holding delay configuration
- or number corresponding to delay in ms
- :type delay: :py:class:`.Delay` or :py:class:`int`
- :param loss: Loss object holding delay configuration or
- number corresponding to loss percentage
- :type loss: :py:class:`.Loss` or :py:class:`float`
- :param rate: The rate of the link in mbit
- :type rate: :py:class:`int`
- """
-
- if rate and not rate > 0:
- raise ValueError("Rate needs to be higher than 0")
-
- if isinstance(delay, int):
- delay = Delay(delay)
- if isinstance(loss, int) or isinstance(loss, float):
- loss = Loss(loss)
- self._delay = delay
- self._loss = loss
- self._rate = rate
-
- @property
- def delay(self):
- return self._delay
-
- @property
- def loss(self):
- return self._loss
-
- @property
- def rate(self):
- return self._rate
-
- def build_command(self, ipcp):
- cmd = []
-
- if ipcp in LinkQuality._active:
- cmd.append("tc qdisc change dev %s root netem" % ipcp.ifname)
- else:
- cmd.append("tc qdisc add dev %s root netem" % ipcp.ifname)
-
- if self.delay:
- cmd.append(self.delay.build_command())
-
- if self.loss:
- cmd.append(self.loss.build_command())
-
- if self.rate:
- cmd.append("rate %imbit" % self.rate)
-
- return " ".join(cmd)
-
- def apply(self, shim):
- if not (self.delay or self.loss or self.rate):
- self.deactivate(shim)
- else:
- for ipcp in shim.ipcps:
- if not ipcp.ifname:
- logger.error("Could not apply LinkQuality to IPCP because "
- "the interface name is None")
- continue
-
- ipcp.node.execute_command(self.build_command(ipcp),
- as_root=True)
- LinkQuality._active.add(ipcp)
-
- def deactivate(self, shim):
- for ipcp in shim.ipcps:
- if ipcp not in LinkQuality._active:
- continue
- if not ipcp.ifname:
- logger.error("Could not remove LinkQuality from IPCP because "
- "the interface name is None")
- continue
-
- ipcp.node.execute_command("tc qdisc del dev %s root "
- "netem" % ipcp.ifname, as_root=True)
- LinkQuality._active.remove(ipcp)
-
-
-class SSHConfig(object):
- def __init__(self, hostname, port=22, proxy_server=None):
- self.username = None
- self.password = None
- self.hostname = hostname
- self.port = port
- self.proxy_server = proxy_server
- self.client = None
- self.proxy_client = None
- self.http_proxy = None
-
- def set_username(self, username):
- self.username = username
-
- def set_password(self, password):
- self.password = password
-
- def set_http_proxy(self, proxy):
- self.http_proxy = proxy
-
-
-class Node(object):
- """
- A node in the experiment.
- """
- def get_e_id(self):
- return "Node." + self.name
-
- def __init__(self, name, difs=None, dif_registrations=None,
- policies=None, machine_type=None):
- """
- :param name: Name of the node.
- :param difs: A list of DIFs the node is in.
- :param dif_registrations: How the DIFs are stacked.
- :param policies: Policies of a DIF specific to the node.
- :param machine_type: Type of machine to use, physical or virtual.
- """
- self.name = name
- if difs is None:
- difs = list()
- self.difs = difs
- for dif in self.difs:
- dif.add_member(self)
- if dif_registrations is None:
- dif_registrations = dict()
- self.dif_registrations = dif_registrations
- self.machine_type = machine_type
- self.ssh_config = SSHConfig(name)
- self.ipcps = []
- self.policies = dict()
- self.has_tcpdump = False
- if policies is None:
- policies = dict()
- for dif in self.difs:
- if hasattr(dif, 'policy'): # check if the dif supports policies
- self.policies[dif] = policies.get(dif, Policy(dif, self))
-
- self.executor = None # will be set by testbed on swap_in
- self.startup_command = None # will be set by prototype
-
- self._validate()
-
- def get_ipcp_by_dif(self, dif):
- """
- :param dif: The DIF to get the IPCP of.
- :return: The IPCP of the node that is in the DIF.
- """
- for ipcp in self.ipcps:
- if ipcp.dif == dif:
- return ipcp
-
- def _undeclared_dif(self, dif):
- if dif not in self.difs:
- raise Exception("Invalid registration: node %s is not declared "
- "to be part of DIF %s" % (self.name, dif.name))
-
- def _validate(self):
- # Check that DIFs referenced in self.dif_registrations
- # are part of self.difs
- for upper in self.dif_registrations:
- self._undeclared_dif(upper)
- for lower in self.dif_registrations[upper]:
- self._undeclared_dif(lower)
-
- def __repr__(self):
- s = "Node " + self.name + ":\n"
-
- s += " DIFs: [ "
- s += " ".join([d.name for d in self.difs])
- s += " ]\n"
-
- s += " DIF registrations: [ "
- rl = []
- for upper in self.dif_registrations:
- difs = self.dif_registrations[upper]
- x = "%s => [" % upper.name
- x += " ".join([lower.name for lower in difs])
- x += "]"
- rl.append(x)
- s += ", ".join(rl)
- s += " ]\n"
-
- return s
-
- def __hash__(self):
- return hash(self.name)
-
- def __eq__(self, other):
- return other is not None and self.name == other.name
-
- def __neq__(self, other):
- return not self == other
-
- def add_dif(self, dif):
- """
- Adds a DIF to the list.
-
- :param dif: Name of the DIF to add.
- """
- self.difs.append(dif)
- dif.add_member(self)
- if hasattr(dif, 'policy'):
- self.policies[dif] = Policy(dif, self)
- self._validate()
-
- def del_dif(self, dif):
- """
- Adds a DIF to the list.
-
- :param dif: Name of the DIF to add.
- """
- self.difs.remove(dif)
- dif.del_member(self)
- try:
- del self.policies[dif]
- except KeyError:
- # It was not in there, so nothing to do
- pass
- self._validate()
-
- def add_dif_registration(self, upper, lower):
- """
- Adds a DIF registration.
-
- :param upper: Name of the DIF that is requesting IPC.
- :param lower: Name of the DIF providing IPC.
- """
- self.dif_registrations[upper].append(lower)
- self._validate()
-
- def del_dif_registration(self, upper, lower):
- """
- Removes a DIF registration.
-
- :param upper: Name of the DIF that is requesting IPC.
- :param lower: Name of the DIF providing IPC.
- """
- self.dif_registrations[upper].remove(lower)
- self._validate()
-
- def add_policy(self, dif, component_name, policy_name, **parameters):
- """
- Adds a policy.
-
- :param dif: The name of the DIF.
- :param component_name: Name of the component.
- :param policy_name: Name of the policy.
- :param parameters: Parameters of the policy.
- """
- self.policies[dif].add_policy(component_name, policy_name, **parameters)
-
- def del_policy(self, dif, component_name=None, policy_name=None):
- """
- Removes a policy.
-
- :param dif: the dif to which the policy should be applied
- :param component_name: Name of the component.
- :param policy_name: Name of the policy.
- """
- self.policies[dif].del_policy(component_name, policy_name)
-
- def get_policy(self, dif):
- """
- :param dif: The DIF to get the policy of.
- :return: Returns the policy.
- """
- return self.policies[dif]
-
- def execute_commands(self, commands, as_root=False, time_out=3,
- use_proxy=False):
- """
- Execute a list of a commands on the node.
-
- :param commands: A list of commands.
- :param as_root: Execute as root?
- :param time_out: Seconds before timing out.
- :param use_proxy: Use a proxy to execute the commands?
- """
- return self.executor.execute_commands(self,
- commands,
- as_root,
- time_out)
-
- def execute_command(self, command, as_root=False, time_out=3,
- use_proxy=False):
- """
- Execute a single command on a node.
-
- :param command: A command.
- :param as_root: Execute as root?
- :param time_out: Seconds before timing out.
- :param use_proxy: Use a proxy to execute the commands?
- :return: The stdout of the command.
- """
- return self.executor.execute_command(self,
- command,
- as_root,
- time_out)
-
- def copy_file(self, path, destination):
- """
- Copy file to node.
-
- :param path: Local location of the file.
- :param destination: Destination location of the file.
- """
- self.executor.copy_file(self, path, destination)
-
- def copy_files(self, paths, destination):
- """
- Copy files to node.
-
- :param paths: Local location of the files.
- :param destination: Destination location of the files.
- """
- self.executor.copy_files(self, paths, destination)
-
- def fetch_file(self, path, destination, sudo=False):
- """
- Fetch file from the node.
-
- :param path: Location of the files on the node.
- :param destination: Destination location of the files.
- :param sudo: The file is owned by root on the node?
- """
- self.executor.fetch_file(self, path, destination, sudo)
-
- def fetch_files(self, paths, destination, sudo=False):
- """
- Fetch files from the node.
-
- :param paths: Location of the files on the node.
- :param destination: Destination location of the files.
- :param sudo: The file is owned by root on the node?
- """
- self.executor.fetch_files(self, paths, destination, sudo)
-
- def set_link_state(self, dif, state):
- """
- Change the state of a link on the node.
-
- :param dif: The name of the shim Ethernet DIF.
- :param state: Up or down.
- """
- ipcp = self.get_ipcp_by_dif(dif)
- self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state,
- as_root=True)
-
-
-class IPCP(object):
- def __init__(self, name, node, dif):
- self.name = name
- self.node = node
- self.dif = dif
- self.registrations = []
-
- # Is this IPCP the first in its DIF, so that it does not need
- # to enroll to anyone ?
- self.dif_bootstrapper = False
-
- def __repr__(self):
- return "{IPCP=%s,DIF=%s,N-1-DIFs=(%s)%s}" % \
- (self.name, self.dif.name,
- ' '.join([dif.name for dif in self.registrations]),
- ',bootstrapper' if self.dif_bootstrapper else ''
- )
-
- def __hash__(self):
- return hash((self.name, self.dif.name))
-
- def __eq__(self, other):
- return other is not None and self.name == other.name \
- and self.dif == other.dif
-
- def __neq__(self, other):
- return not self == other
-
-
-class ShimEthIPCP(IPCP):
- def __init__(self, name, node, dif, ifname=None):
- IPCP.__init__(self, name, node, dif)
- self.ifname = ifname
-
-
-class ShimUDPIPCP(IPCP):
- def __init__(self, name, node, dif):
- IPCP.__init__(self, name, node, dif)
- # TODO: add IP and port
-
-
-class Policy(object):
- def __init__(self, dif, node=None, policies=None):
- self.dif = dif # type: NormalDIF
- self.node = node
- if policies is None:
- self._dict = dict()
- else:
- self._dict = policies
-
- def add_policy(self, component_name, policy_name, **parameters):
- self._dict.setdefault(component_name, dict())[policy_name] = parameters
-
- #
- # Fetches effective policy info
- #
- def get_policies(self, component_name=None, policy_name=None):
- policy = self._superimpose()
- if component_name is None:
- return policy._dict
- elif policy_name is None:
- return policy._dict[component_name]
- else:
- return policy._dict[component_name][policy_name]
-
- def del_policy(self, component_name=None, policy_name=None):
- if component_name is None:
- self._dict = dict()
- elif policy_name is None:
- del self._dict[component_name]
- else:
- del self._dict[component_name][policy_name]
-
- #
- # Merges this policy into that of its dif, obtaining
- # the effective policy acting on self.node.
- #
- def _superimpose(self):
- if self.node is None:
- return self
- other = self.dif.policy
- base = dict(other._dict)
- base.update(self._dict)
- return Policy(self.dif, self.node, base)
-
- def __eq__(self, other):
- if not isinstance(other, Policy):
- return False
- else:
- return other.dif == self.dif \
- and other.node == self.node \
- and other._dict == self._dict
-
- def __str__(self):
- node_str = (" Node: " + self.node) if self.node is not None else ""
- return "Policy[Dif: %(dif)s,%(node_str)s Dict: %(dict)s]" \
- % {"dif": self.dif, "node_str": node_str, "dict": self._dict}
-
- def __repr__(self):
- node_str = (" Node: " + self.node) if self.node is not None else ""
- s = "Policy[ Dif: %(dif)s,%(node_str)s" \
- % {"dif": self.dif, "node_str": node_str}
- comps = []
- for component in self._dict:
- for policy in self._dict[component]:
- comps.append("\n Component %s has policy %s with params %s"
- % (component,
- policy,
- self._dict[component][policy]))
- s += ",".join(comps)
- s += "\n]\n"
- return s
-
-
-class Experiment(object):
- """
- Base class for experiments.
- """
- __metaclass__ = abc.ABCMeta
-
- def __init__(self, testbed,
- nodes=None,
- git_repo=None,
- git_branch=None,
- log_dir=None,
- prototype_logs=None):
- """
- :param testbed: The testbed of the experiment.
- :param nodes: The list of nodes in the experiment.
- :param git_repo: The git repository of the prototype.
- :param git_branch: The git branch of the repository.
- :param log_dir: Where to log output of the experiment.
- :param prototype_logs: Where the prototype logs its output.
- """
- if nodes is None:
- nodes = list()
- self.nodes = nodes
- self.git_repo = git_repo
- self.git_branch = git_branch
- self.testbed = testbed
- # the strategy employed for completing the enrollment phase in
- # the different DIFs
- self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual'
- # the strategy employed for setting up the data transfer
- # networks in the DIFs after enrollment
- self.dt_strategy = 'full-mesh' # 'minimal', 'manual'
- self.dif_ordering = []
- self.enrollments = [] # a list of per-DIF lists of enrollments
- self.dt_flows = [] # a list of per-DIF lists of data transfer flows
- self.mgmt_flows = [] # a list of per-DIF lists of management flows
-
- # Determine log directory
- if log_dir is None:
- # If it is None, use /tmp/rumba/{project}
- # Wipe it and make it again
- exp_name = self.testbed.exp_name.replace('/', '_') # Just in case
- log_dir = os.path.join(tmp_dir, exp_name)
- shutil.rmtree(log_dir, ignore_errors=True)
- os.mkdir(log_dir)
- self.log_dir = log_dir
- if not os.path.isdir(self.log_dir):
- raise Exception('Destination "%s" is not a directory. '
- 'Cannot fetch logs.'
- % self.log_dir)
- self.prototype_logs = prototype_logs \
- if prototype_logs is not None else []
-
- # Generate missing information
- self.generate()
-
- def __repr__(self):
- s = ""
- for n in self.nodes:
- s += "\n" + str(n)
-
- return s
-
- def add_node(self, node):
- """
- Adds a node to the experiment.
-
- :param node: A node.
- """
- self.nodes.append(node)
- self.generate()
-
- def del_node(self, node):
- """
- Deletes a node from the experiment.
-
- :param node: A node.
- """
- self.nodes.remove(node)
- self.generate()
-
- # Compute registration/enrollment order for DIFs
- def compute_dif_ordering(self):
- # Compute DIFs dependency graph, as both adjacency and incidence list.
- difsdeps_adj = dict()
- difsdeps_inc = dict()
-
- for node in self.nodes:
- for dif in node.difs:
- if dif not in difsdeps_adj:
- difsdeps_adj[dif] = set()
-
- for upper in node.dif_registrations:
- for lower in node.dif_registrations[upper]:
- if upper not in difsdeps_inc:
- difsdeps_inc[upper] = set()
- if lower not in difsdeps_inc:
- difsdeps_inc[lower] = set()
- if upper not in difsdeps_adj:
- difsdeps_adj[upper] = set()
- if lower not in difsdeps_adj:
- difsdeps_adj[lower] = set()
- difsdeps_inc[upper].add(lower)
- difsdeps_adj[lower].add(upper)
-
- # Kahn's algorithm below only needs per-node count of
- # incident edges, so we compute these counts from the
- # incidence list and drop the latter.
- difsdeps_inc_cnt = dict()
- for dif in difsdeps_inc:
- difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif])
- del difsdeps_inc
-
- # Init difsdeps_inc_cnt for those DIFs that do not
- # act as lower IPCPs nor upper IPCPs for registration
- # operations
- for node in self.nodes:
- for dif in node.difs:
- if dif not in difsdeps_inc_cnt:
- difsdeps_inc_cnt[dif] = 0
-
- # Run Kahn's algorithm to compute topological
- # ordering on the DIFs graph.
- frontier = set()
- self.dif_ordering = []
- for dif in difsdeps_inc_cnt:
- if difsdeps_inc_cnt[dif] == 0:
- frontier.add(dif)
-
- while len(frontier):
- cur = frontier.pop()
- self.dif_ordering.append(cur)
- for nxt in difsdeps_adj[cur]:
- difsdeps_inc_cnt[nxt] -= 1
- if difsdeps_inc_cnt[nxt] == 0:
- frontier.add(nxt)
- difsdeps_adj[cur] = set()
-
- circular_set = [dif for dif in difsdeps_inc_cnt
- if difsdeps_inc_cnt[dif] != 0]
- if len(circular_set):
- raise Exception("Fatal error: The specified DIFs topology"
- "has one or more"
- "circular dependencies, involving the following"
- " DIFs: %s" % circular_set)
-
- logger.debug("DIF topological ordering: %s", self.dif_ordering)
-
- # Compute all the enrollments, to be called after compute_dif_ordering()
- def compute_enrollments(self):
- dif_graphs = dict()
- self.enrollments = []
- self.mgmt_flows = []
- self.dt_flows = []
-
- for dif in self.dif_ordering:
- neighsets = dict()
- dif_graphs[dif] = dict()
- first = None
-
- # For each N-1-DIF supporting this DIF, compute the set of nodes
- # that share such N-1-DIF. This set will be called the 'neighset' of
- # the N-1-DIF for the current DIF.
-
- for node in self.nodes:
- if dif in node.dif_registrations:
- dif_graphs[dif][node] = [] # init for later use
- if first is None: # pick any node for later use
- first = node
- for lower_dif in node.dif_registrations[dif]:
- if lower_dif not in neighsets:
- neighsets[lower_dif] = []
- neighsets[lower_dif].append(node)
-
- # Build the graph, represented as adjacency list
- for lower_dif in neighsets:
- # Each neighset corresponds to a complete (sub)graph.
- for node1 in neighsets[lower_dif]:
- for node2 in neighsets[lower_dif]:
- if node1 != node2:
- dif_graphs[dif][node1].append((node2, lower_dif))
-
- self.enrollments.append([])
- self.dt_flows.append([])
- self.mgmt_flows.append([])
-
- if first is None:
- # This is a shim DIF, nothing to do
- continue
-
- er = []
- for node in dif_graphs[dif]:
- for edge in dif_graphs[dif][node]:
- er.append("%s --[%s]--> %s" % (node.name,
- edge[1].name,
- edge[0].name))
- logger.debug("DIF graph for %s: %s", dif, ', '.join(er))
-
- # To generate the list of mgmt flows, minimal enrollments
- # and minimal dt flows, we simulate it, using
- # breadth-first traversal.
- enrolled = {first}
- frontier = {first}
- while len(frontier):
- cur = frontier.pop()
- for edge in dif_graphs[dif][cur]:
- if edge[0] not in enrolled:
- enrolled.add(edge[0])
- enrollee = edge[0].get_ipcp_by_dif(dif)
- assert(enrollee is not None)
- enroller = cur.get_ipcp_by_dif(dif)
- assert(enroller is not None)
- if self.enrollment_strategy == 'minimal':
- self.enrollments[-1].append({'dif': dif,
- 'enrollee': enrollee,
- 'enroller': enroller,
- 'lower_dif': edge[1]})
- self.mgmt_flows[-1].append({'src': enrollee,
- 'dst': enroller})
- if self.dt_strategy == 'minimal':
- self.dt_flows[-1].append({'src': enrollee,
- 'dst': enroller})
- frontier.add(edge[0])
- if len(dif.members) != len(enrolled):
- raise Exception("Disconnected DIF found: %s" % (dif,))
-
- # In case of a full mesh enrollment or dt flows
- for cur in dif_graphs[dif]:
- for edge in dif_graphs[dif][cur]:
- if cur.name < edge[0].name:
- enrollee = cur.get_ipcp_by_dif(dif)
- assert(enrollee is not None)
- enroller = edge[0].get_ipcp_by_dif(dif)
- assert(enroller is not None)
- if self.enrollment_strategy == 'full-mesh':
- self.enrollments[-1].append({'dif': dif,
- 'enrollee': enrollee,
- 'enroller': enroller,
- 'lower_dif': edge[1]})
- if self.dt_strategy == 'full-mesh':
- self.dt_flows[-1].append({'src': enrollee,
- 'dst': enroller})
-
- if not (self.dt_strategy == 'minimal'
- or self.dt_strategy == 'full-mesh') \
- or not (self.enrollment_strategy == 'full-mesh'
- or self.enrollment_strategy == 'minimal'):
- # This is a bug
- assert False
-
- log_string = "Enrollments:\n"
- for el in self.enrollments:
- for e in el:
- log_string += (" [%s] %s --> %s through N-1-DIF %s\n"
- % (e['dif'],
- e['enrollee'].name,
- e['enroller'].name,
- e['lower_dif']))
- logger.debug(log_string)
-
- log_string = "Mgmt flows:\n"
- for el in self.mgmt_flows:
- for e in el:
- log_string += (" %s --> %s \n"
- % (e['src'].name,
- e['dst'].name))
- logger.debug(log_string)
-
- log_string = "Dt flows:\n"
- for el in self.dt_flows:
- for e in el:
- log_string += (" %s --> %s \n"
- % (e['src'].name,
- e['dst'].name))
- logger.debug(log_string)
-
- def compute_ipcps(self):
- # For each node, compute the required IPCP instances, and associated
- # registrations
- for node in self.nodes:
- node.ipcps = []
- # We want also the node.ipcps list to be generated in
- # topological ordering
- for dif in self.dif_ordering:
- if dif not in node.difs:
- continue
-
- # Create an instance of the required IPCP class
- ipcp = dif.get_ipcp_class()(
- name='%s.%s' % (dif.name, node.name),
- node=node, dif=dif)
-
- if dif in node.dif_registrations:
- for lower in node.dif_registrations[dif]:
- ipcp.registrations.append(lower)
-
- node.ipcps.append(ipcp)
- dif.ipcps.append(ipcp)
-
- def compute_bootstrappers(self):
- for node in self.nodes:
- for ipcp in node.ipcps:
- ipcp.dif_bootstrapper = True
- for el in self.enrollments:
- for e in el:
- if e['dif'] != ipcp.dif:
- # Skip this DIF
- break
- if e['enrollee'] == ipcp:
- ipcp.dif_bootstrapper = False
- # Exit the loops
- break
- if not ipcp.dif_bootstrapper:
- break
-
- def dump_ssh_info(self):
- f = open(os.path.join(tmp_dir, 'ssh_info'), 'w')
- for node in self.nodes:
- f.write("%s;%s;%s;%s;%s\n" % (node.name,
- self.testbed.username,
- node.ssh_config.hostname,
- node.ssh_config.port,
- node.ssh_config.proxy_server))
- f.close()
-
- # Examine the nodes and DIFs, compute the registration and enrollment
- # order, the list of IPCPs to create, registrations, ...
- def generate(self):
- start = time.time()
- self.compute_dif_ordering()
- self.compute_ipcps()
- self.compute_enrollments()
- self.compute_bootstrappers()
- for node in self.nodes:
- logger.info("IPCPs for node %s: %s", node.name, node.ipcps)
- end = time.time()
- logger.info("Layer ordering computation took %.2f seconds", end - start)
-
- def install_prototype(self):
- """
- Installs the prototype on the nodes.
- """
- start = time.time()
- self._install_prototype()
- end = time.time()
- logger.info("Install took %.2f seconds", end - start)
-
- def set_startup_command(self, command):
- for node in self.nodes:
- node.startup_command = command
-
- def bootstrap_prototype(self):
- """
- Bootstraps the prototype on the nodes.
- """
- start = time.time()
- self._bootstrap_prototype()
- end = time.time()
- logger.info("Bootstrap took %.2f seconds", end - start)
-
- @abc.abstractmethod
- def _install_prototype(self):
- raise Exception('install_prototype() method not implemented')
-
- @abc.abstractmethod
- def _bootstrap_prototype(self):
- raise Exception('bootstrap_prototype() method not implemented')
-
- @abc.abstractmethod
- def prototype_name(self):
- raise Exception('prototype_name() method not implemented')
-
- @abc.abstractmethod
- def _terminate_prototype(self):
- raise Exception('terminate_prototype() method not implemented')
-
- def swap_in(self):
- """
- Swap the experiment in on the testbed.
- """
- start = time.time()
- self.testbed.swap_in(self)
- self.dump_ssh_info()
- end = time.time()
- logger.info("Swap-in took %.2f seconds", end - start)
-
- def swap_out(self):
- """
- Swap the experiment out of the testbed.
- """
- start = time.time()
- # Terminate prototype gracefully
- self._terminate_prototype()
- for node in self.nodes:
- if node.ssh_config.client is not None:
- node.ssh_config.client.close()
- if node.ssh_config.proxy_client is not None:
- node.ssh_config.proxy_client.close()
- # Undo the testbed (testbed-specific)
- self.testbed.swap_out(self)
- end = time.time()
- logger.info("Swap-out took %.2f seconds", end - start)
-
-
-class Executor:
- __metaclass__ = abc.ABCMeta
-
- @abc.abstractmethod
- def execute_command(self, node, command, as_root=False, time_out=3):
- # Execute command on a node
- return
-
- def execute_commands(self, node, commands, as_root=False, time_out=3):
- for command in commands:
- self.execute_command(node, command, as_root, time_out)
-
- @abc.abstractmethod
- def copy_file(self, node, path, destination):
- return
-
- def copy_files(self, node, paths, destination):
- for path in paths:
- self.copy_file(node, path, destination)
-
- @abc.abstractmethod
- def fetch_file(self, node, path, destination, sudo=False):
- return
-
- def fetch_files(self, node, paths, destination, sudo=False):
- for path in paths:
- self.fetch_file(node, path, destination, sudo)
diff --git a/setup.py b/setup.py
index 2ac0b2f..77661a2 100755
--- a/setup.py
+++ b/setup.py
@@ -16,7 +16,13 @@ setuptools.setup(
author_email='sander.vrijders@ugent.be',
license='LGPL',
description='Rumba measurement framework for RINA',
- packages=['rumba', 'rumba.testbeds', 'rumba.prototypes', 'rumba.executors'],
+ packages=[
+ 'rumba',
+ 'rumba.testbeds',
+ 'rumba.prototypes',
+ 'rumba.executors',
+ 'rumba.elements'
+ ],
install_requires=[
'paramiko',
'docker',