From c1ad42adc854604e0023a01633976cb3d26b9244 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Thu, 29 Mar 2018 15:56:37 +0200 Subject: model: split model into two files Changed `rumba.model` into a namespace, moved business logic inside the rumba.elements package --- rumba/elements/__init__.py | 0 rumba/elements/experimentation.py | 539 ++++++++++++++ rumba/elements/topology.py | 869 +++++++++++++++++++++++ rumba/model.py | 1394 ++----------------------------------- setup.py | 8 +- 5 files changed, 1468 insertions(+), 1342 deletions(-) create mode 100644 rumba/elements/__init__.py create mode 100644 rumba/elements/experimentation.py create mode 100644 rumba/elements/topology.py diff --git a/rumba/elements/__init__.py b/rumba/elements/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rumba/elements/experimentation.py b/rumba/elements/experimentation.py new file mode 100644 index 0000000..2626a0a --- /dev/null +++ b/rumba/elements/experimentation.py @@ -0,0 +1,539 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders +# Dimitri Staessens +# Vincenzo Maffione +# Marco Capitani +# Nick Aerts +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + + +import abc +import os +import shutil +import time + +import rumba.log as log +import rumba.elements.topology as topology + +logger = log.get_logger(__name__) + + +tmp_dir = '/tmp/rumba' + + +class Testbed(object): + """ + Base class for every testbed plugin. + """ + def __init__(self, + exp_name, + username, + password, + proj_name, + http_proxy=None, + system_logs=None): + """ + :param exp_name: The experiment name. + :param username: The username. + :param password: The password. + :param proj_name: The project name. + :param http_proxy: HTTP proxy used by the testbed. + :param system_logs: Location of the system logs of + images of the testbed. + """ + self.username = username + self.password = password + self.proj_name = proj_name + self.exp_name = exp_name + self.http_proxy = http_proxy + self.flags = {'no_vlan_offload': False} + self.executor = None + if system_logs is None: + self.system_logs = ['/var/log/syslog'] + elif isinstance(system_logs, str): + self.system_logs = [system_logs] + else: + self.system_logs = system_logs + + def swap_in(self, experiment): + """ + Swaps experiment in on the testbed. + + :param experiment: The experiment. + """ + for node in experiment.nodes: + node.executor = self.executor + + self._swap_in(experiment) + + for dif in experiment.dif_ordering: + if isinstance(dif, topology.ShimEthDIF): + dif.link_quality.apply(dif) + + @abc.abstractmethod + def _swap_in(self, experiment): + logger.info("_swap_in(): nothing to do") + + def swap_out(self, experiment): + """ + Swaps experiment out of the testbed. + + :param experiment: The experiment. + """ + self._swap_out(experiment) + + @abc.abstractmethod + def _swap_out(self, experiment): + logger.info("swap_out(): nothing to do") + + +class Experiment(object): + """ + Base class for experiments. + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, testbed, + nodes=None, + git_repo=None, + git_branch=None, + log_dir=None, + prototype_logs=None): + """ + :param testbed: The testbed of the experiment. + :param nodes: The list of nodes in the experiment. + :param git_repo: The git repository of the prototype. + :param git_branch: The git branch of the repository. + :param log_dir: Where to log output of the experiment. + :param prototype_logs: Where the prototype logs its output. + """ + if nodes is None: + nodes = list() + self.nodes = nodes + self.git_repo = git_repo + self.git_branch = git_branch + self.testbed = testbed + # the strategy employed for completing the enrollment phase in + # the different DIFs + self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual' + # the strategy employed for setting up the data transfer + # networks in the DIFs after enrollment + self.dt_strategy = 'full-mesh' # 'minimal', 'manual' + self.dif_ordering = [] + self.enrollments = [] # a list of per-DIF lists of enrollments + self.dt_flows = [] # a list of per-DIF lists of data transfer flows + self.mgmt_flows = [] # a list of per-DIF lists of management flows + + # Determine log directory + if log_dir is None: + # If it is None, use /tmp/rumba/{project} + # Wipe it and make it again + exp_name = self.testbed.exp_name.replace('/', '_') # Just in case + log_dir = os.path.join(tmp_dir, exp_name) + shutil.rmtree(log_dir, ignore_errors=True) + os.mkdir(log_dir) + self.log_dir = log_dir + if not os.path.isdir(self.log_dir): + raise Exception('Destination "%s" is not a directory. ' + 'Cannot fetch logs.' + % self.log_dir) + self.prototype_logs = prototype_logs \ + if prototype_logs is not None else [] + + # Generate missing information + self.generate() + + def __repr__(self): + s = "" + for n in self.nodes: + s += "\n" + str(n) + + return s + + def add_node(self, node): + """ + Adds a node to the experiment. + + :param node: A node. + """ + self.nodes.append(node) + self.generate() + + def del_node(self, node): + """ + Deletes a node from the experiment. + + :param node: A node. + """ + self.nodes.remove(node) + self.generate() + + # Compute registration/enrollment order for DIFs + def compute_dif_ordering(self): + # Compute DIFs dependency graph, as both adjacency and incidence list. + difsdeps_adj = dict() + difsdeps_inc = dict() + + for node in self.nodes: + for dif in node.difs: + if dif not in difsdeps_adj: + difsdeps_adj[dif] = set() + + for upper in node.dif_registrations: + for lower in node.dif_registrations[upper]: + if upper not in difsdeps_inc: + difsdeps_inc[upper] = set() + if lower not in difsdeps_inc: + difsdeps_inc[lower] = set() + if upper not in difsdeps_adj: + difsdeps_adj[upper] = set() + if lower not in difsdeps_adj: + difsdeps_adj[lower] = set() + difsdeps_inc[upper].add(lower) + difsdeps_adj[lower].add(upper) + + # Kahn's algorithm below only needs per-node count of + # incident edges, so we compute these counts from the + # incidence list and drop the latter. + difsdeps_inc_cnt = dict() + for dif in difsdeps_inc: + difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif]) + del difsdeps_inc + + # Init difsdeps_inc_cnt for those DIFs that do not + # act as lower IPCPs nor upper IPCPs for registration + # operations + for node in self.nodes: + for dif in node.difs: + if dif not in difsdeps_inc_cnt: + difsdeps_inc_cnt[dif] = 0 + + # Run Kahn's algorithm to compute topological + # ordering on the DIFs graph. + frontier = set() + self.dif_ordering = [] + for dif in difsdeps_inc_cnt: + if difsdeps_inc_cnt[dif] == 0: + frontier.add(dif) + + while len(frontier): + cur = frontier.pop() + self.dif_ordering.append(cur) + for nxt in difsdeps_adj[cur]: + difsdeps_inc_cnt[nxt] -= 1 + if difsdeps_inc_cnt[nxt] == 0: + frontier.add(nxt) + difsdeps_adj[cur] = set() + + circular_set = [dif for dif in difsdeps_inc_cnt + if difsdeps_inc_cnt[dif] != 0] + if len(circular_set): + raise Exception("Fatal error: The specified DIFs topology" + "has one or more" + "circular dependencies, involving the following" + " DIFs: %s" % circular_set) + + logger.debug("DIF topological ordering: %s", self.dif_ordering) + + # Compute all the enrollments, to be called after compute_dif_ordering() + def compute_enrollments(self): + dif_graphs = dict() + self.enrollments = [] + self.mgmt_flows = [] + self.dt_flows = [] + + for dif in self.dif_ordering: + neighsets = dict() + dif_graphs[dif] = dict() + first = None + + # For each N-1-DIF supporting this DIF, compute the set of nodes + # that share such N-1-DIF. This set will be called the 'neighset' of + # the N-1-DIF for the current DIF. + + for node in self.nodes: + if dif in node.dif_registrations: + dif_graphs[dif][node] = [] # init for later use + if first is None: # pick any node for later use + first = node + for lower_dif in node.dif_registrations[dif]: + if lower_dif not in neighsets: + neighsets[lower_dif] = [] + neighsets[lower_dif].append(node) + + # Build the graph, represented as adjacency list + for lower_dif in neighsets: + # Each neighset corresponds to a complete (sub)graph. + for node1 in neighsets[lower_dif]: + for node2 in neighsets[lower_dif]: + if node1 != node2: + dif_graphs[dif][node1].append((node2, lower_dif)) + + self.enrollments.append([]) + self.dt_flows.append([]) + self.mgmt_flows.append([]) + + if first is None: + # This is a shim DIF, nothing to do + continue + + er = [] + for node in dif_graphs[dif]: + for edge in dif_graphs[dif][node]: + er.append("%s --[%s]--> %s" % (node.name, + edge[1].name, + edge[0].name)) + logger.debug("DIF graph for %s: %s", dif, ', '.join(er)) + + # To generate the list of mgmt flows, minimal enrollments + # and minimal dt flows, we simulate it, using + # breadth-first traversal. + enrolled = {first} + frontier = {first} + while len(frontier): + cur = frontier.pop() + for edge in dif_graphs[dif][cur]: + if edge[0] not in enrolled: + enrolled.add(edge[0]) + enrollee = edge[0].get_ipcp_by_dif(dif) + assert(enrollee is not None) + enroller = cur.get_ipcp_by_dif(dif) + assert(enroller is not None) + if self.enrollment_strategy == 'minimal': + self.enrollments[-1].append({'dif': dif, + 'enrollee': enrollee, + 'enroller': enroller, + 'lower_dif': edge[1]}) + self.mgmt_flows[-1].append({'src': enrollee, + 'dst': enroller}) + if self.dt_strategy == 'minimal': + self.dt_flows[-1].append({'src': enrollee, + 'dst': enroller}) + frontier.add(edge[0]) + if len(dif.members) != len(enrolled): + raise Exception("Disconnected DIF found: %s" % (dif,)) + + # In case of a full mesh enrollment or dt flows + for cur in dif_graphs[dif]: + for edge in dif_graphs[dif][cur]: + if cur.name < edge[0].name: + enrollee = cur.get_ipcp_by_dif(dif) + assert(enrollee is not None) + enroller = edge[0].get_ipcp_by_dif(dif) + assert(enroller is not None) + if self.enrollment_strategy == 'full-mesh': + self.enrollments[-1].append({'dif': dif, + 'enrollee': enrollee, + 'enroller': enroller, + 'lower_dif': edge[1]}) + if self.dt_strategy == 'full-mesh': + self.dt_flows[-1].append({'src': enrollee, + 'dst': enroller}) + + if not (self.dt_strategy == 'minimal' + or self.dt_strategy == 'full-mesh') \ + or not (self.enrollment_strategy == 'full-mesh' + or self.enrollment_strategy == 'minimal'): + # This is a bug + assert False + + log_string = "Enrollments:\n" + for el in self.enrollments: + for e in el: + log_string += (" [%s] %s --> %s through N-1-DIF %s\n" + % (e['dif'], + e['enrollee'].name, + e['enroller'].name, + e['lower_dif'])) + logger.debug(log_string) + + log_string = "Mgmt flows:\n" + for el in self.mgmt_flows: + for e in el: + log_string += (" %s --> %s \n" + % (e['src'].name, + e['dst'].name)) + logger.debug(log_string) + + log_string = "Dt flows:\n" + for el in self.dt_flows: + for e in el: + log_string += (" %s --> %s \n" + % (e['src'].name, + e['dst'].name)) + logger.debug(log_string) + + def compute_ipcps(self): + # For each node, compute the required IPCP instances, and associated + # registrations + for node in self.nodes: + node.ipcps = [] + # We want also the node.ipcps list to be generated in + # topological ordering + for dif in self.dif_ordering: + if dif not in node.difs: + continue + + # Create an instance of the required IPCP class + ipcp = dif.get_ipcp_class()( + name='%s.%s' % (dif.name, node.name), + node=node, dif=dif) + + if dif in node.dif_registrations: + for lower in node.dif_registrations[dif]: + ipcp.registrations.append(lower) + + node.ipcps.append(ipcp) + dif.ipcps.append(ipcp) + + def compute_bootstrappers(self): + for node in self.nodes: + for ipcp in node.ipcps: + ipcp.dif_bootstrapper = True + for el in self.enrollments: + for e in el: + if e['dif'] != ipcp.dif: + # Skip this DIF + break + if e['enrollee'] == ipcp: + ipcp.dif_bootstrapper = False + # Exit the loops + break + if not ipcp.dif_bootstrapper: + break + + def dump_ssh_info(self): + f = open(os.path.join(tmp_dir, 'ssh_info'), 'w') + for node in self.nodes: + f.write("%s;%s;%s;%s;%s\n" % (node.name, + self.testbed.username, + node.ssh_config.hostname, + node.ssh_config.port, + node.ssh_config.proxy_server)) + f.close() + + # Examine the nodes and DIFs, compute the registration and enrollment + # order, the list of IPCPs to create, registrations, ... + def generate(self): + start = time.time() + self.compute_dif_ordering() + self.compute_ipcps() + self.compute_enrollments() + self.compute_bootstrappers() + for node in self.nodes: + logger.info("IPCPs for node %s: %s", node.name, node.ipcps) + end = time.time() + logger.info("Layer ordering computation took %.2f seconds", end - start) + + def install_prototype(self): + """ + Installs the prototype on the nodes. + """ + start = time.time() + self._install_prototype() + end = time.time() + logger.info("Install took %.2f seconds", end - start) + + def set_startup_command(self, command): + for node in self.nodes: + node.startup_command = command + + def bootstrap_prototype(self): + """ + Bootstraps the prototype on the nodes. + """ + start = time.time() + self._bootstrap_prototype() + end = time.time() + logger.info("Bootstrap took %.2f seconds", end - start) + + @abc.abstractmethod + def _install_prototype(self): + raise Exception('install_prototype() method not implemented') + + @abc.abstractmethod + def _bootstrap_prototype(self): + raise Exception('bootstrap_prototype() method not implemented') + + @abc.abstractmethod + def prototype_name(self): + raise Exception('prototype_name() method not implemented') + + @abc.abstractmethod + def _terminate_prototype(self): + raise Exception('terminate_prototype() method not implemented') + + def swap_in(self): + """ + Swap the experiment in on the testbed. + """ + start = time.time() + self.testbed.swap_in(self) + self.dump_ssh_info() + end = time.time() + logger.info("Swap-in took %.2f seconds", end - start) + + def swap_out(self): + """ + Swap the experiment out of the testbed. + """ + start = time.time() + # Terminate prototype gracefully + self._terminate_prototype() + for node in self.nodes: + if node.ssh_config.client is not None: + node.ssh_config.client.close() + if node.ssh_config.proxy_client is not None: + node.ssh_config.proxy_client.close() + # Undo the testbed (testbed-specific) + self.testbed.swap_out(self) + end = time.time() + logger.info("Swap-out took %.2f seconds", end - start) + + +class Executor: + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def execute_command(self, node, command, as_root=False, time_out=3): + # Execute command on a node + return + + def execute_commands(self, node, commands, as_root=False, time_out=3): + for command in commands: + self.execute_command(node, command, as_root, time_out) + + @abc.abstractmethod + def copy_file(self, node, path, destination): + return + + def copy_files(self, node, paths, destination): + for path in paths: + self.copy_file(node, path, destination) + + @abc.abstractmethod + def fetch_file(self, node, path, destination, sudo=False): + return + + def fetch_files(self, node, paths, destination, sudo=False): + for path in paths: + self.fetch_file(node, path, destination, sudo) diff --git a/rumba/elements/topology.py b/rumba/elements/topology.py new file mode 100644 index 0000000..24dcfa2 --- /dev/null +++ b/rumba/elements/topology.py @@ -0,0 +1,869 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders +# Dimitri Staessens +# Vincenzo Maffione +# Marco Capitani +# Nick Aerts +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + + +from enum import Enum + +import rumba.log as log + +logger = log.get_logger(__name__) + + +class DIF(object): + """ + Base class for DIFs. + """ + def __init__(self, name, members=None): + """ + :param name: Name of the DIF. + :param members: List of nodes that are members of the DIF. + """ + self.name = name + if members is None: + members = list() + self.members = members + self.ipcps = list() + + def __repr__(self): + s = "DIF %s" % self.name + return s + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return other is not None and self.name == other.name + + def __neq__(self, other): + return not self == other + + def add_member(self, node): + self.members.append(node) + + def del_member(self, node): + self.members.remove(node) + + def get_ipcp_class(self): + return IPCP + + +# Shim over UDP +# +class ShimUDPDIF(DIF): + """ + Shim over UDP. + """ + def __init__(self, name, members=None): + """ + :param name: Name of the DIF. + :param members: List of members of the DIF. + """ + DIF.__init__(self, name, members) + + def get_ipcp_class(self): + return ShimUDPIPCP + + +# Shim over Ethernet +# +# @link_speed [int] Speed of the Ethernet network, in Mbps +# +class ShimEthDIF(DIF): + """ + Shim over Ethernet. + """ + def get_e_id(self): + return "ShimEthDIF." + self.name + + def __init__(self, name, members=None, link_quality=None): + """ + :param name: Name of the DIF. + :param members: List of members of the DIF. + :param link_quality: Quality of the link. + """ + DIF.__init__(self, name, members) + self._link_quality = link_quality if link_quality is not None else LinkQuality() + + def get_ipcp_class(self): + return ShimEthIPCP + + def add_member(self, node): + super(ShimEthDIF, self).add_member(node) + if len(self.members) > 2: + raise Exception("More than 2 members in %s!" % self.name) + + @property + def link_quality(self): + return self._link_quality + + @link_quality.setter + def link_quality(self, _link_quality): + if not _link_quality: + raise ValueError("Cannot set link_quality to None, use del " + "link_quality to reset") + + self._link_quality = _link_quality + + _link_quality.apply(self) + + @link_quality.deleter + def link_quality(self): + self._link_quality.deactivate(self) + + def set_delay(self, delay=0, + jitter=None, + correlation=None, + distribution=None): + """ + Set the delay parameters of the underlying link. + Parameters as in :py:class:`.Delay` + + :param delay: average delay in ms + :type delay: :py:class:`int` + :param jitter: jitter in ms + :type jitter: :py:class:`int` + :param correlation: correlation in % + :type correlation: :py:class:`int` + :param distribution: delay distribution, defaults to a Normal + distribution + :type distribution: :py:class:`.Distribution` + """ + new_delay = Delay(delay, jitter, correlation, distribution) + new_quality = LinkQuality.clone(self.link_quality, delay=new_delay) + self.link_quality = new_quality + + def set_loss(self, + loss=0, + correlation=None): + """ + Set the loss parameter of the underlying link. + Parameters as in :py:class:`.Loss` + + :param loss: loss in percentage + :type loss: :py:class:`int` or :py:class:`float` + :param correlation: correlation in percentage + :type correlation: :py:class:`int` or :py:class:`float` + """ + new_loss = Loss(loss, correlation) + new_quality = LinkQuality.clone(self.link_quality, loss=new_loss) + self.link_quality = new_quality + + def set_rate(self, rate=None): + """ + Set the rate parameter of the underlying link. + + :param rate: The desired rate in mbps + :type rate: :py:class:`int` + """ + new_quality = LinkQuality.clone(self.link_quality, rate=rate) + self.link_quality = new_quality + + def set_quality(self, delay, loss, rate): + """ + Configure the basic quality parameters of the + underlying link. + + :param delay: the link delay, in ms + :type delay: :py:class:`int` + :param loss: the link loss, as a percentage + :type loss: :py:class:`float` or :py:class:`int` + :param rate: the link rate in mbps + :type rate: :py:class:`int` + """ + new_quality = LinkQuality(delay, loss, rate) + self.link_quality = new_quality + + +class NormalDIF(DIF): + """ + Normal DIF. + """ + def __init__(self, name, members=None, policy=None): + """ + :param name: The name of the DIF. + :param members: The list of members. + :param policy: Policies of the normal DIF. + """ + DIF.__init__(self, name, members) + if policy is None: + policy = Policy(self) + self.policy = policy + + def add_policy(self, comp, pol, **params): + """ + Adds a policy to the DIF. + + :param comp: Component name. + :param pol: Policy name + :param params: Parameters of the policy. + """ + self.policy.add_policy(comp, pol, **params) + + def del_policy(self, comp=None, pol=None): + """ + Deletes a policy from the DIF. + + :param comp: Component name. + :param pol: Policy name + """ + self.policy.del_policy(comp, pol) + + def show(self): + """ + :return: A string representing the policies in the DIF. + """ + s = DIF.__repr__(self) + for comp, pol_dict in self.policy.get_policies().items(): + for pol, params in pol_dict.items(): + s += "\n Component %s has policy %s with params %s" \ + % (comp, pol, params) + return s + + +class Distribution(Enum): + """ + An enum holding different statistical distributions. + + **Values:** + + `NORMAL = 1` + + `PARETO = 2` + + `PARETONORMAL = 3` + """ + NORMAL = 1 + PARETO = 2 + PARETONORMAL = 3 + + +class Delay(object): + """ + A class representing delay of a link. + """ + def __init__(self, delay=0, jitter=None, correlation=None, + distribution=None): + """ + Configure link delay. + + :param delay: average delay in ms + :type delay: :py:class:`int` + :param jitter: jitter in ms + :type jitter: :py:class:`int` + :param correlation: correlation in % + :type correlation: :py:class:`int` + :param distribution: delay distribution, defaults to a Normal + distribution + :type distribution: :py:class:`.Distribution` + """ + + if delay < 0: + raise ValueError("Delay needs to be at least 0") + + if jitter and not jitter > 0: + raise ValueError("Jitter needs to be higher than 0") + + if (not jitter) and correlation: + raise ValueError("Correlation requires a value for jitter") + + if correlation and (correlation < 0 or correlation > 100): + raise ValueError("Correlation needs to be between 0 and 100") + + self._delay = delay + self._jitter = jitter + self._correlation = correlation + self._distribution = distribution + + @property + def delay(self): + return self._delay + + @property + def jitter(self): + return self._jitter + + @property + def correlation(self): + return self._correlation + + @property + def distribution(self): + return self._distribution + + def build_command(self): + opts = ["delay %ims" % self.delay] + + if self.jitter: + opts.append("%ims" % self.jitter) + + if self.correlation: + opts.append("%f%%" % self.correlation) + + if self.distribution: + opts.append("distribution %s" % self.distribution.name.lower()) + + return " ".join(opts) + + +class Loss(object): + """ + A class representing loss on a link. + """ + def __init__(self, loss, correlation=None): + """ + Configure link loss. + + :param loss: loss in percentage + :type loss: :py:class:`int` or :py:class:`float` + :param correlation: correlation in percentage + :type correlation: :py:class:`int` or :py:class:`float` + """ + if loss and (loss < 0 or loss > 100): + raise ValueError("Loss needs to be between 0 and 100") + + if correlation and (correlation < 0 or correlation > 100): + raise ValueError("Correlation needs to be between 0 and 100") + + self._loss = loss + self._correlation = correlation + + @property + def loss(self): + return self._loss + + @property + def correlation(self): + return self._correlation + + def build_command(self): + opts = ["loss %f%%" % self.loss] + + if self.correlation: + opts.append("%f%%" % self.correlation) + + return " ".join(opts) + + +class LinkQuality(object): + """ + A class representing the link quality. + """ + _active = set() + + @classmethod + def clone(cls, old_quality, delay=None, loss=None, rate=None): + """ + Clone old_quality, updating it with the provided parameters + if present. + + :param old_quality: A :py:class:`.LinkQuality` instance to + use as a base + :type old_quality: :py:class:`.LinkQuality` + :param delay: Delay object holding delay configuration + or number corresponding to delay in ms + :type delay: :py:class:`.Delay` or :py:class:`int` + :param loss: Loss object holding delay configuration or + number corresponding to loss percentage + :type loss: :py:class:`.Loss` or :py:class:`float` + :param rate: The rate of the link in mbit + :type rate: :py:class:`int` + :return: a new :py:class:`.LinkQuality` instance. + :rtype: :py:class:`.LinkQuality` + """ + if delay is None: + delay = old_quality.delay + if loss is None: + loss = old_quality.loss + if rate is None: + rate = old_quality.rate + return LinkQuality(delay, loss, rate) + + def __init__(self, delay=None, loss=None, rate=None): + """ + Link quality configuration. + + :param delay: Delay object holding delay configuration + or number corresponding to delay in ms + :type delay: :py:class:`.Delay` or :py:class:`int` + :param loss: Loss object holding delay configuration or + number corresponding to loss percentage + :type loss: :py:class:`.Loss` or :py:class:`float` + :param rate: The rate of the link in mbit + :type rate: :py:class:`int` + """ + + if rate and not rate > 0: + raise ValueError("Rate needs to be higher than 0") + + if isinstance(delay, int): + delay = Delay(delay) + if isinstance(loss, int) or isinstance(loss, float): + loss = Loss(loss) + self._delay = delay + self._loss = loss + self._rate = rate + + @property + def delay(self): + return self._delay + + @property + def loss(self): + return self._loss + + @property + def rate(self): + return self._rate + + def build_command(self, ipcp): + cmd = [] + + if ipcp in LinkQuality._active: + cmd.append("tc qdisc change dev %s root netem" % ipcp.ifname) + else: + cmd.append("tc qdisc add dev %s root netem" % ipcp.ifname) + + if self.delay: + cmd.append(self.delay.build_command()) + + if self.loss: + cmd.append(self.loss.build_command()) + + if self.rate: + cmd.append("rate %imbit" % self.rate) + + return " ".join(cmd) + + def apply(self, shim): + if not (self.delay or self.loss or self.rate): + self.deactivate(shim) + else: + for ipcp in shim.ipcps: + if not ipcp.ifname: + logger.error("Could not apply LinkQuality to IPCP because " + "the interface name is None") + continue + + ipcp.node.execute_command(self.build_command(ipcp), + as_root=True) + LinkQuality._active.add(ipcp) + + def deactivate(self, shim): + for ipcp in shim.ipcps: + if ipcp not in LinkQuality._active: + continue + if not ipcp.ifname: + logger.error("Could not remove LinkQuality from IPCP because " + "the interface name is None") + continue + + ipcp.node.execute_command("tc qdisc del dev %s root " + "netem" % ipcp.ifname, as_root=True) + LinkQuality._active.remove(ipcp) + + +class SSHConfig(object): + def __init__(self, hostname, port=22, proxy_server=None): + self.username = None + self.password = None + self.hostname = hostname + self.port = port + self.proxy_server = proxy_server + self.client = None + self.proxy_client = None + self.http_proxy = None + + def set_username(self, username): + self.username = username + + def set_password(self, password): + self.password = password + + def set_http_proxy(self, proxy): + self.http_proxy = proxy + + +class Node(object): + """ + A node in the experiment. + """ + def get_e_id(self): + return "Node." + self.name + + def __init__(self, name, difs=None, dif_registrations=None, + policies=None, machine_type=None): + """ + :param name: Name of the node. + :param difs: A list of DIFs the node is in. + :param dif_registrations: How the DIFs are stacked. + :param policies: Policies of a DIF specific to the node. + :param machine_type: Type of machine to use, physical or virtual. + """ + self.name = name + if difs is None: + difs = list() + self.difs = difs + for dif in self.difs: + dif.add_member(self) + if dif_registrations is None: + dif_registrations = dict() + self.dif_registrations = dif_registrations + self.machine_type = machine_type + self.ssh_config = SSHConfig(name) + self.ipcps = [] + self.policies = dict() + self.has_tcpdump = False + if policies is None: + policies = dict() + for dif in self.difs: + if hasattr(dif, 'policy'): # check if the dif supports policies + self.policies[dif] = policies.get(dif, Policy(dif, self)) + + self.executor = None # will be set by testbed on swap_in + self.startup_command = None # will be set by prototype + + self._validate() + + def get_ipcp_by_dif(self, dif): + """ + :param dif: The DIF to get the IPCP of. + :return: The IPCP of the node that is in the DIF. + """ + for ipcp in self.ipcps: + if ipcp.dif == dif: + return ipcp + + def _undeclared_dif(self, dif): + if dif not in self.difs: + raise Exception("Invalid registration: node %s is not declared " + "to be part of DIF %s" % (self.name, dif.name)) + + def _validate(self): + # Check that DIFs referenced in self.dif_registrations + # are part of self.difs + for upper in self.dif_registrations: + self._undeclared_dif(upper) + for lower in self.dif_registrations[upper]: + self._undeclared_dif(lower) + + def __repr__(self): + s = "Node " + self.name + ":\n" + + s += " DIFs: [ " + s += " ".join([d.name for d in self.difs]) + s += " ]\n" + + s += " DIF registrations: [ " + rl = [] + for upper in self.dif_registrations: + difs = self.dif_registrations[upper] + x = "%s => [" % upper.name + x += " ".join([lower.name for lower in difs]) + x += "]" + rl.append(x) + s += ", ".join(rl) + s += " ]\n" + + return s + + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return other is not None and self.name == other.name + + def __neq__(self, other): + return not self == other + + def add_dif(self, dif): + """ + Adds a DIF to the list. + + :param dif: Name of the DIF to add. + """ + self.difs.append(dif) + dif.add_member(self) + if hasattr(dif, 'policy'): + self.policies[dif] = Policy(dif, self) + self._validate() + + def del_dif(self, dif): + """ + Adds a DIF to the list. + + :param dif: Name of the DIF to add. + """ + self.difs.remove(dif) + dif.del_member(self) + try: + del self.policies[dif] + except KeyError: + # It was not in there, so nothing to do + pass + self._validate() + + def add_dif_registration(self, upper, lower): + """ + Adds a DIF registration. + + :param upper: Name of the DIF that is requesting IPC. + :param lower: Name of the DIF providing IPC. + """ + self.dif_registrations[upper].append(lower) + self._validate() + + def del_dif_registration(self, upper, lower): + """ + Removes a DIF registration. + + :param upper: Name of the DIF that is requesting IPC. + :param lower: Name of the DIF providing IPC. + """ + self.dif_registrations[upper].remove(lower) + self._validate() + + def add_policy(self, dif, component_name, policy_name, **parameters): + """ + Adds a policy. + + :param dif: The name of the DIF. + :param component_name: Name of the component. + :param policy_name: Name of the policy. + :param parameters: Parameters of the policy. + """ + self.policies[dif].add_policy(component_name, policy_name, **parameters) + + def del_policy(self, dif, component_name=None, policy_name=None): + """ + Removes a policy. + + :param dif: the dif to which the policy should be applied + :param component_name: Name of the component. + :param policy_name: Name of the policy. + """ + self.policies[dif].del_policy(component_name, policy_name) + + def get_policy(self, dif): + """ + :param dif: The DIF to get the policy of. + :return: Returns the policy. + """ + return self.policies[dif] + + def execute_commands(self, commands, as_root=False, time_out=3, + use_proxy=False): + """ + Execute a list of a commands on the node. + + :param commands: A list of commands. + :param as_root: Execute as root? + :param time_out: Seconds before timing out. + :param use_proxy: Use a proxy to execute the commands? + """ + return self.executor.execute_commands(self, + commands, + as_root, + time_out) + + def execute_command(self, command, as_root=False, time_out=3, + use_proxy=False): + """ + Execute a single command on a node. + + :param command: A command. + :param as_root: Execute as root? + :param time_out: Seconds before timing out. + :param use_proxy: Use a proxy to execute the commands? + :return: The stdout of the command. + """ + return self.executor.execute_command(self, + command, + as_root, + time_out) + + def copy_file(self, path, destination): + """ + Copy file to node. + + :param path: Local location of the file. + :param destination: Destination location of the file. + """ + self.executor.copy_file(self, path, destination) + + def copy_files(self, paths, destination): + """ + Copy files to node. + + :param paths: Local location of the files. + :param destination: Destination location of the files. + """ + self.executor.copy_files(self, paths, destination) + + def fetch_file(self, path, destination, sudo=False): + """ + Fetch file from the node. + + :param path: Location of the files on the node. + :param destination: Destination location of the files. + :param sudo: The file is owned by root on the node? + """ + self.executor.fetch_file(self, path, destination, sudo) + + def fetch_files(self, paths, destination, sudo=False): + """ + Fetch files from the node. + + :param paths: Location of the files on the node. + :param destination: Destination location of the files. + :param sudo: The file is owned by root on the node? + """ + self.executor.fetch_files(self, paths, destination, sudo) + + def set_link_state(self, dif, state): + """ + Change the state of a link on the node. + + :param dif: The name of the shim Ethernet DIF. + :param state: Up or down. + """ + ipcp = self.get_ipcp_by_dif(dif) + self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state, + as_root=True) + + +class IPCP(object): + def __init__(self, name, node, dif): + self.name = name + self.node = node + self.dif = dif + self.registrations = [] + + # Is this IPCP the first in its DIF, so that it does not need + # to enroll to anyone ? + self.dif_bootstrapper = False + + def __repr__(self): + return "{IPCP=%s,DIF=%s,N-1-DIFs=(%s)%s}" % \ + (self.name, self.dif.name, + ' '.join([dif.name for dif in self.registrations]), + ',bootstrapper' if self.dif_bootstrapper else '' + ) + + def __hash__(self): + return hash((self.name, self.dif.name)) + + def __eq__(self, other): + return other is not None and self.name == other.name \ + and self.dif == other.dif + + def __neq__(self, other): + return not self == other + + +class ShimEthIPCP(IPCP): + def __init__(self, name, node, dif, ifname=None): + IPCP.__init__(self, name, node, dif) + self.ifname = ifname + + +class ShimUDPIPCP(IPCP): + def __init__(self, name, node, dif): + IPCP.__init__(self, name, node, dif) + # TODO: add IP and port + + +class Policy(object): + def __init__(self, dif, node=None, policies=None): + self.dif = dif # type: NormalDIF + self.node = node + if policies is None: + self._dict = dict() + else: + self._dict = policies + + def add_policy(self, component_name, policy_name, **parameters): + self._dict.setdefault(component_name, dict())[policy_name] = parameters + + # + # Fetches effective policy info + # + def get_policies(self, component_name=None, policy_name=None): + policy = self._superimpose() + if component_name is None: + return policy._dict + elif policy_name is None: + return policy._dict[component_name] + else: + return policy._dict[component_name][policy_name] + + def del_policy(self, component_name=None, policy_name=None): + if component_name is None: + self._dict = dict() + elif policy_name is None: + del self._dict[component_name] + else: + del self._dict[component_name][policy_name] + + # + # Merges this policy into that of its dif, obtaining + # the effective policy acting on self.node. + # + def _superimpose(self): + if self.node is None: + return self + other = self.dif.policy + base = dict(other._dict) + base.update(self._dict) + return Policy(self.dif, self.node, base) + + def __eq__(self, other): + if not isinstance(other, Policy): + return False + else: + return other.dif == self.dif \ + and other.node == self.node \ + and other._dict == self._dict + + def __str__(self): + node_str = (" Node: " + self.node) if self.node is not None else "" + return "Policy[Dif: %(dif)s,%(node_str)s Dict: %(dict)s]" \ + % {"dif": self.dif, "node_str": node_str, "dict": self._dict} + + def __repr__(self): + node_str = (" Node: " + self.node) if self.node is not None else "" + s = "Policy[ Dif: %(dif)s,%(node_str)s" \ + % {"dif": self.dif, "node_str": node_str} + comps = [] + for component in self._dict: + for policy in self._dict[component]: + comps.append("\n Component %s has policy %s with params %s" + % (component, + policy, + self._dict[component][policy])) + s += ",".join(comps) + s += "\n]\n" + return s diff --git a/rumba/model.py b/rumba/model.py index a6925b3..4c151e9 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -25,19 +25,65 @@ # Foundation, Inc., http://www.fsf.org/about/contact/. # -import abc + import os import stat -import time -import shutil -from enum import Enum -import rumba.log as log +from rumba.elements.topology import ( + DIF, + ShimEthDIF, + ShimUDPDIF, + NormalDIF, + + IPCP, + ShimEthIPCP, + ShimUDPIPCP, + + Node, + SSHConfig, + + LinkQuality, + Delay, + Loss +) + +from rumba.elements.experimentation import ( + Experiment, + Testbed, + Executor, + tmp_dir +) + + +__all__ = [ + # Topology + "DIF", + "ShimEthDIF", + "ShimUDPDIF", + "NormalDIF", + + "IPCP", + "ShimEthIPCP", + "ShimUDPIPCP", -logger = log.get_logger(__name__) + "Node", + "SSHConfig", + + "LinkQuality", + "Delay", + "Loss", + + # Experimentation + "Experiment", + "Testbed", + "Executor", + "tmp_dir", + + # Other + "cache_dir" +] -tmp_dir = '/tmp/rumba' try: os.mkdir(tmp_dir) os.chmod(tmp_dir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) @@ -57,1337 +103,3 @@ try: except OSError: # Already there, nothing to do pass - - -class Testbed(object): - """ - Base class for every testbed plugin. - """ - def __init__(self, - exp_name, - username, - password, - proj_name, - http_proxy=None, - system_logs=None): - """ - :param exp_name: The experiment name. - :param username: The username. - :param password: The password. - :param proj_name: The project name. - :param http_proxy: HTTP proxy used by the testbed. - :param system_logs: Location of the system logs of - images of the testbed. - """ - self.username = username - self.password = password - self.proj_name = proj_name - self.exp_name = exp_name - self.http_proxy = http_proxy - self.flags = {'no_vlan_offload': False} - if system_logs is None: - self.system_logs = ['/var/log/syslog'] - elif isinstance(system_logs, str): - self.system_logs = [system_logs] - else: - self.system_logs = system_logs - - def swap_in(self, experiment): - """ - Swaps experiment in on the testbed. - - :param experiment: The experiment. - """ - for node in experiment.nodes: - node.executor = self.executor - - self._swap_in(experiment) - - for dif in experiment.dif_ordering: - if isinstance(dif, ShimEthDIF): - dif.link_quality.apply(dif) - - @abc.abstractmethod - def _swap_in(self, experiment): - logger.info("_swap_in(): nothing to do") - - def swap_out(self, experiment): - """ - Swaps experiment out of the testbed. - - :param experiment: The experiment. - """ - self._swap_out(experiment) - - @abc.abstractmethod - def _swap_out(self, experiment): - logger.info("swap_out(): nothing to do") - - -class DIF(object): - """ - Base class for DIFs. - """ - def __init__(self, name, members=None): - """ - :param name: Name of the DIF. - :param members: List of nodes that are members of the DIF. - """ - self.name = name - if members is None: - members = list() - self.members = members - self.ipcps = list() - - def __repr__(self): - s = "DIF %s" % self.name - return s - - def __hash__(self): - return hash(self.name) - - def __eq__(self, other): - return other is not None and self.name == other.name - - def __neq__(self, other): - return not self == other - - def add_member(self, node): - self.members.append(node) - - def del_member(self, node): - self.members.remove(node) - - def get_ipcp_class(self): - return IPCP - - -# Shim over UDP -# -class ShimUDPDIF(DIF): - """ - Shim over UDP. - """ - def __init__(self, name, members=None): - """ - :param name: Name of the DIF. - :param members: List of members of the DIF. - """ - DIF.__init__(self, name, members) - - def get_ipcp_class(self): - return ShimUDPIPCP - - -# Shim over Ethernet -# -# @link_speed [int] Speed of the Ethernet network, in Mbps -# -class ShimEthDIF(DIF): - """ - Shim over Ethernet. - """ - def get_e_id(self): - return "ShimEthDIF." + self.name - - def __init__(self, name, members=None, link_quality=None): - """ - :param name: Name of the DIF. - :param members: List of members of the DIF. - :param link_quality: Quality of the link. - """ - DIF.__init__(self, name, members) - self._link_quality = link_quality if link_quality is not None else LinkQuality() - - def get_ipcp_class(self): - return ShimEthIPCP - - def add_member(self, node): - super(ShimEthDIF, self).add_member(node) - if len(self.members) > 2: - raise Exception("More than 2 members in %s!" % self.name) - - @property - def link_quality(self): - return self._link_quality - - @link_quality.setter - def link_quality(self, _link_quality): - if not _link_quality: - raise ValueError("Cannot set link_quality to None, use del " - "link_quality to reset") - - self._link_quality = _link_quality - - _link_quality.apply(self) - - @link_quality.deleter - def link_quality(self): - self._link_quality.deactivate(self) - - def set_delay(self, delay=0, - jitter=None, - correlation=None, - distribution=None): - """ - Set the delay parameters of the underlying link. - Parameters as in :py:class:`.Delay` - - :param delay: average delay in ms - :type delay: :py:class:`int` - :param jitter: jitter in ms - :type jitter: :py:class:`int` - :param correlation: correlation in % - :type correlation: :py:class:`int` - :param distribution: delay distribution, defaults to a Normal - distribution - :type distribution: :py:class:`.Distribution` - """ - new_delay = Delay(delay, jitter, correlation, distribution) - new_quality = LinkQuality.clone(self.link_quality, delay=new_delay) - self.link_quality = new_quality - - def set_loss(self, - loss=0, - correlation=None): - """ - Set the loss parameter of the underlying link. - Parameters as in :py:class:`.Loss` - - :param loss: loss in percentage - :type loss: :py:class:`int` or :py:class:`float` - :param correlation: correlation in percentage - :type correlation: :py:class:`int` or :py:class:`float` - """ - new_loss = Loss(loss, correlation) - new_quality = LinkQuality.clone(self.link_quality, loss=new_loss) - self.link_quality = new_quality - - def set_rate(self, rate=None): - """ - Set the rate parameter of the underlying link. - - :param rate: The desired rate in mbps - :type rate: :py:class:`int` - """ - new_quality = LinkQuality.clone(self.link_quality, rate=rate) - self.link_quality = new_quality - - def set_quality(self, delay, loss, rate): - """ - Configure the basic quality parameters of the - underlying link. - - :param delay: the link delay, in ms - :type delay: :py:class:`int` - :param loss: the link loss, as a percentage - :type loss: :py:class:`float` or :py:class:`int` - :param rate: the link rate in mbps - :type rate: :py:class:`int` - """ - new_quality = LinkQuality(delay, loss, rate) - self.link_quality = new_quality - - -class NormalDIF(DIF): - """ - Normal DIF. - """ - def __init__(self, name, members=None, policy=None): - """ - :param name: The name of the DIF. - :param members: The list of members. - :param policy: Policies of the normal DIF. - """ - DIF.__init__(self, name, members) - if policy is None: - policy = Policy(self) - self.policy = policy - - def add_policy(self, comp, pol, **params): - """ - Adds a policy to the DIF. - - :param comp: Component name. - :param pol: Policy name - :param params: Parameters of the policy. - """ - self.policy.add_policy(comp, pol, **params) - - def del_policy(self, comp=None, pol=None): - """ - Deletes a policy from the DIF. - - :param comp: Component name. - :param pol: Policy name - """ - self.policy.del_policy(comp, pol) - - def show(self): - """ - :return: A string representing the policies in the DIF. - """ - s = DIF.__repr__(self) - for comp, pol_dict in self.policy.get_policies().items(): - for pol, params in pol_dict.items(): - s += "\n Component %s has policy %s with params %s" \ - % (comp, pol, params) - return s - - -class Distribution(Enum): - """ - An enum holding different statistical distributions. - - **Values:** - - `NORMAL = 1` - - `PARETO = 2` - - `PARETONORMAL = 3` - """ - NORMAL = 1 - PARETO = 2 - PARETONORMAL = 3 - - -class Delay(object): - """ - A class representing delay of a link. - """ - def __init__(self, delay=0, jitter=None, correlation=None, - distribution=None): - """ - Configure link delay. - - :param delay: average delay in ms - :type delay: :py:class:`int` - :param jitter: jitter in ms - :type jitter: :py:class:`int` - :param correlation: correlation in % - :type correlation: :py:class:`int` - :param distribution: delay distribution, defaults to a Normal - distribution - :type distribution: :py:class:`.Distribution` - """ - - if delay < 0: - raise ValueError("Delay needs to be at least 0") - - if jitter and not jitter > 0: - raise ValueError("Jitter needs to be higher than 0") - - if (not jitter) and correlation: - raise ValueError("Correlation requires a value for jitter") - - if correlation and (correlation < 0 or correlation > 100): - raise ValueError("Correlation needs to be between 0 and 100") - - self._delay = delay - self._jitter = jitter - self._correlation = correlation - self._distribution = distribution - - @property - def delay(self): - return self._delay - - @property - def jitter(self): - return self._jitter - - @property - def correlation(self): - return self._correlation - - @property - def distribution(self): - return self._distribution - - def build_command(self): - opts = ["delay %ims" % self.delay] - - if self.jitter: - opts.append("%ims" % self.jitter) - - if self.correlation: - opts.append("%f%%" % self.correlation) - - if self.distribution: - opts.append("distribution %s" % self.distribution.name.lower()) - - return " ".join(opts) - - -class Loss(object): - """ - A class representing loss on a link. - """ - def __init__(self, loss, correlation=None): - """ - Configure link loss. - - :param loss: loss in percentage - :type loss: :py:class:`int` or :py:class:`float` - :param correlation: correlation in percentage - :type correlation: :py:class:`int` or :py:class:`float` - """ - if loss and (loss < 0 or loss > 100): - raise ValueError("Loss needs to be between 0 and 100") - - if correlation and (correlation < 0 or correlation > 100): - raise ValueError("Correlation needs to be between 0 and 100") - - self._loss = loss - self._correlation = correlation - - @property - def loss(self): - return self._loss - - @property - def correlation(self): - return self._correlation - - def build_command(self): - opts = ["loss %f%%" % self.loss] - - if self.correlation: - opts.append("%f%%" % self.correlation) - - return " ".join(opts) - - -class LinkQuality(object): - """ - A class representing the link quality. - """ - _active = set() - - @classmethod - def clone(cls, old_quality, delay=None, loss=None, rate=None): - """ - Clone old_quality, updating it with the provided parameters - if present. - - :param old_quality: A :py:class:`.LinkQuality` instance to - use as a base - :type old_quality: :py:class:`.LinkQuality` - :param delay: Delay object holding delay configuration - or number corresponding to delay in ms - :type delay: :py:class:`.Delay` or :py:class:`int` - :param loss: Loss object holding delay configuration or - number corresponding to loss percentage - :type loss: :py:class:`.Loss` or :py:class:`float` - :param rate: The rate of the link in mbit - :type rate: :py:class:`int` - :return: a new :py:class:`.LinkQuality` instance. - :rtype: :py:class:`.LinkQuality` - """ - if delay is None: - delay = old_quality.delay - if loss is None: - loss = old_quality.loss - if rate is None: - rate = old_quality.rate - return LinkQuality(delay, loss, rate) - - def __init__(self, delay=None, loss=None, rate=None): - """ - Link quality configuration. - - :param delay: Delay object holding delay configuration - or number corresponding to delay in ms - :type delay: :py:class:`.Delay` or :py:class:`int` - :param loss: Loss object holding delay configuration or - number corresponding to loss percentage - :type loss: :py:class:`.Loss` or :py:class:`float` - :param rate: The rate of the link in mbit - :type rate: :py:class:`int` - """ - - if rate and not rate > 0: - raise ValueError("Rate needs to be higher than 0") - - if isinstance(delay, int): - delay = Delay(delay) - if isinstance(loss, int) or isinstance(loss, float): - loss = Loss(loss) - self._delay = delay - self._loss = loss - self._rate = rate - - @property - def delay(self): - return self._delay - - @property - def loss(self): - return self._loss - - @property - def rate(self): - return self._rate - - def build_command(self, ipcp): - cmd = [] - - if ipcp in LinkQuality._active: - cmd.append("tc qdisc change dev %s root netem" % ipcp.ifname) - else: - cmd.append("tc qdisc add dev %s root netem" % ipcp.ifname) - - if self.delay: - cmd.append(self.delay.build_command()) - - if self.loss: - cmd.append(self.loss.build_command()) - - if self.rate: - cmd.append("rate %imbit" % self.rate) - - return " ".join(cmd) - - def apply(self, shim): - if not (self.delay or self.loss or self.rate): - self.deactivate(shim) - else: - for ipcp in shim.ipcps: - if not ipcp.ifname: - logger.error("Could not apply LinkQuality to IPCP because " - "the interface name is None") - continue - - ipcp.node.execute_command(self.build_command(ipcp), - as_root=True) - LinkQuality._active.add(ipcp) - - def deactivate(self, shim): - for ipcp in shim.ipcps: - if ipcp not in LinkQuality._active: - continue - if not ipcp.ifname: - logger.error("Could not remove LinkQuality from IPCP because " - "the interface name is None") - continue - - ipcp.node.execute_command("tc qdisc del dev %s root " - "netem" % ipcp.ifname, as_root=True) - LinkQuality._active.remove(ipcp) - - -class SSHConfig(object): - def __init__(self, hostname, port=22, proxy_server=None): - self.username = None - self.password = None - self.hostname = hostname - self.port = port - self.proxy_server = proxy_server - self.client = None - self.proxy_client = None - self.http_proxy = None - - def set_username(self, username): - self.username = username - - def set_password(self, password): - self.password = password - - def set_http_proxy(self, proxy): - self.http_proxy = proxy - - -class Node(object): - """ - A node in the experiment. - """ - def get_e_id(self): - return "Node." + self.name - - def __init__(self, name, difs=None, dif_registrations=None, - policies=None, machine_type=None): - """ - :param name: Name of the node. - :param difs: A list of DIFs the node is in. - :param dif_registrations: How the DIFs are stacked. - :param policies: Policies of a DIF specific to the node. - :param machine_type: Type of machine to use, physical or virtual. - """ - self.name = name - if difs is None: - difs = list() - self.difs = difs - for dif in self.difs: - dif.add_member(self) - if dif_registrations is None: - dif_registrations = dict() - self.dif_registrations = dif_registrations - self.machine_type = machine_type - self.ssh_config = SSHConfig(name) - self.ipcps = [] - self.policies = dict() - self.has_tcpdump = False - if policies is None: - policies = dict() - for dif in self.difs: - if hasattr(dif, 'policy'): # check if the dif supports policies - self.policies[dif] = policies.get(dif, Policy(dif, self)) - - self.executor = None # will be set by testbed on swap_in - self.startup_command = None # will be set by prototype - - self._validate() - - def get_ipcp_by_dif(self, dif): - """ - :param dif: The DIF to get the IPCP of. - :return: The IPCP of the node that is in the DIF. - """ - for ipcp in self.ipcps: - if ipcp.dif == dif: - return ipcp - - def _undeclared_dif(self, dif): - if dif not in self.difs: - raise Exception("Invalid registration: node %s is not declared " - "to be part of DIF %s" % (self.name, dif.name)) - - def _validate(self): - # Check that DIFs referenced in self.dif_registrations - # are part of self.difs - for upper in self.dif_registrations: - self._undeclared_dif(upper) - for lower in self.dif_registrations[upper]: - self._undeclared_dif(lower) - - def __repr__(self): - s = "Node " + self.name + ":\n" - - s += " DIFs: [ " - s += " ".join([d.name for d in self.difs]) - s += " ]\n" - - s += " DIF registrations: [ " - rl = [] - for upper in self.dif_registrations: - difs = self.dif_registrations[upper] - x = "%s => [" % upper.name - x += " ".join([lower.name for lower in difs]) - x += "]" - rl.append(x) - s += ", ".join(rl) - s += " ]\n" - - return s - - def __hash__(self): - return hash(self.name) - - def __eq__(self, other): - return other is not None and self.name == other.name - - def __neq__(self, other): - return not self == other - - def add_dif(self, dif): - """ - Adds a DIF to the list. - - :param dif: Name of the DIF to add. - """ - self.difs.append(dif) - dif.add_member(self) - if hasattr(dif, 'policy'): - self.policies[dif] = Policy(dif, self) - self._validate() - - def del_dif(self, dif): - """ - Adds a DIF to the list. - - :param dif: Name of the DIF to add. - """ - self.difs.remove(dif) - dif.del_member(self) - try: - del self.policies[dif] - except KeyError: - # It was not in there, so nothing to do - pass - self._validate() - - def add_dif_registration(self, upper, lower): - """ - Adds a DIF registration. - - :param upper: Name of the DIF that is requesting IPC. - :param lower: Name of the DIF providing IPC. - """ - self.dif_registrations[upper].append(lower) - self._validate() - - def del_dif_registration(self, upper, lower): - """ - Removes a DIF registration. - - :param upper: Name of the DIF that is requesting IPC. - :param lower: Name of the DIF providing IPC. - """ - self.dif_registrations[upper].remove(lower) - self._validate() - - def add_policy(self, dif, component_name, policy_name, **parameters): - """ - Adds a policy. - - :param dif: The name of the DIF. - :param component_name: Name of the component. - :param policy_name: Name of the policy. - :param parameters: Parameters of the policy. - """ - self.policies[dif].add_policy(component_name, policy_name, **parameters) - - def del_policy(self, dif, component_name=None, policy_name=None): - """ - Removes a policy. - - :param dif: the dif to which the policy should be applied - :param component_name: Name of the component. - :param policy_name: Name of the policy. - """ - self.policies[dif].del_policy(component_name, policy_name) - - def get_policy(self, dif): - """ - :param dif: The DIF to get the policy of. - :return: Returns the policy. - """ - return self.policies[dif] - - def execute_commands(self, commands, as_root=False, time_out=3, - use_proxy=False): - """ - Execute a list of a commands on the node. - - :param commands: A list of commands. - :param as_root: Execute as root? - :param time_out: Seconds before timing out. - :param use_proxy: Use a proxy to execute the commands? - """ - return self.executor.execute_commands(self, - commands, - as_root, - time_out) - - def execute_command(self, command, as_root=False, time_out=3, - use_proxy=False): - """ - Execute a single command on a node. - - :param command: A command. - :param as_root: Execute as root? - :param time_out: Seconds before timing out. - :param use_proxy: Use a proxy to execute the commands? - :return: The stdout of the command. - """ - return self.executor.execute_command(self, - command, - as_root, - time_out) - - def copy_file(self, path, destination): - """ - Copy file to node. - - :param path: Local location of the file. - :param destination: Destination location of the file. - """ - self.executor.copy_file(self, path, destination) - - def copy_files(self, paths, destination): - """ - Copy files to node. - - :param paths: Local location of the files. - :param destination: Destination location of the files. - """ - self.executor.copy_files(self, paths, destination) - - def fetch_file(self, path, destination, sudo=False): - """ - Fetch file from the node. - - :param path: Location of the files on the node. - :param destination: Destination location of the files. - :param sudo: The file is owned by root on the node? - """ - self.executor.fetch_file(self, path, destination, sudo) - - def fetch_files(self, paths, destination, sudo=False): - """ - Fetch files from the node. - - :param paths: Location of the files on the node. - :param destination: Destination location of the files. - :param sudo: The file is owned by root on the node? - """ - self.executor.fetch_files(self, paths, destination, sudo) - - def set_link_state(self, dif, state): - """ - Change the state of a link on the node. - - :param dif: The name of the shim Ethernet DIF. - :param state: Up or down. - """ - ipcp = self.get_ipcp_by_dif(dif) - self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state, - as_root=True) - - -class IPCP(object): - def __init__(self, name, node, dif): - self.name = name - self.node = node - self.dif = dif - self.registrations = [] - - # Is this IPCP the first in its DIF, so that it does not need - # to enroll to anyone ? - self.dif_bootstrapper = False - - def __repr__(self): - return "{IPCP=%s,DIF=%s,N-1-DIFs=(%s)%s}" % \ - (self.name, self.dif.name, - ' '.join([dif.name for dif in self.registrations]), - ',bootstrapper' if self.dif_bootstrapper else '' - ) - - def __hash__(self): - return hash((self.name, self.dif.name)) - - def __eq__(self, other): - return other is not None and self.name == other.name \ - and self.dif == other.dif - - def __neq__(self, other): - return not self == other - - -class ShimEthIPCP(IPCP): - def __init__(self, name, node, dif, ifname=None): - IPCP.__init__(self, name, node, dif) - self.ifname = ifname - - -class ShimUDPIPCP(IPCP): - def __init__(self, name, node, dif): - IPCP.__init__(self, name, node, dif) - # TODO: add IP and port - - -class Policy(object): - def __init__(self, dif, node=None, policies=None): - self.dif = dif # type: NormalDIF - self.node = node - if policies is None: - self._dict = dict() - else: - self._dict = policies - - def add_policy(self, component_name, policy_name, **parameters): - self._dict.setdefault(component_name, dict())[policy_name] = parameters - - # - # Fetches effective policy info - # - def get_policies(self, component_name=None, policy_name=None): - policy = self._superimpose() - if component_name is None: - return policy._dict - elif policy_name is None: - return policy._dict[component_name] - else: - return policy._dict[component_name][policy_name] - - def del_policy(self, component_name=None, policy_name=None): - if component_name is None: - self._dict = dict() - elif policy_name is None: - del self._dict[component_name] - else: - del self._dict[component_name][policy_name] - - # - # Merges this policy into that of its dif, obtaining - # the effective policy acting on self.node. - # - def _superimpose(self): - if self.node is None: - return self - other = self.dif.policy - base = dict(other._dict) - base.update(self._dict) - return Policy(self.dif, self.node, base) - - def __eq__(self, other): - if not isinstance(other, Policy): - return False - else: - return other.dif == self.dif \ - and other.node == self.node \ - and other._dict == self._dict - - def __str__(self): - node_str = (" Node: " + self.node) if self.node is not None else "" - return "Policy[Dif: %(dif)s,%(node_str)s Dict: %(dict)s]" \ - % {"dif": self.dif, "node_str": node_str, "dict": self._dict} - - def __repr__(self): - node_str = (" Node: " + self.node) if self.node is not None else "" - s = "Policy[ Dif: %(dif)s,%(node_str)s" \ - % {"dif": self.dif, "node_str": node_str} - comps = [] - for component in self._dict: - for policy in self._dict[component]: - comps.append("\n Component %s has policy %s with params %s" - % (component, - policy, - self._dict[component][policy])) - s += ",".join(comps) - s += "\n]\n" - return s - - -class Experiment(object): - """ - Base class for experiments. - """ - __metaclass__ = abc.ABCMeta - - def __init__(self, testbed, - nodes=None, - git_repo=None, - git_branch=None, - log_dir=None, - prototype_logs=None): - """ - :param testbed: The testbed of the experiment. - :param nodes: The list of nodes in the experiment. - :param git_repo: The git repository of the prototype. - :param git_branch: The git branch of the repository. - :param log_dir: Where to log output of the experiment. - :param prototype_logs: Where the prototype logs its output. - """ - if nodes is None: - nodes = list() - self.nodes = nodes - self.git_repo = git_repo - self.git_branch = git_branch - self.testbed = testbed - # the strategy employed for completing the enrollment phase in - # the different DIFs - self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual' - # the strategy employed for setting up the data transfer - # networks in the DIFs after enrollment - self.dt_strategy = 'full-mesh' # 'minimal', 'manual' - self.dif_ordering = [] - self.enrollments = [] # a list of per-DIF lists of enrollments - self.dt_flows = [] # a list of per-DIF lists of data transfer flows - self.mgmt_flows = [] # a list of per-DIF lists of management flows - - # Determine log directory - if log_dir is None: - # If it is None, use /tmp/rumba/{project} - # Wipe it and make it again - exp_name = self.testbed.exp_name.replace('/', '_') # Just in case - log_dir = os.path.join(tmp_dir, exp_name) - shutil.rmtree(log_dir, ignore_errors=True) - os.mkdir(log_dir) - self.log_dir = log_dir - if not os.path.isdir(self.log_dir): - raise Exception('Destination "%s" is not a directory. ' - 'Cannot fetch logs.' - % self.log_dir) - self.prototype_logs = prototype_logs \ - if prototype_logs is not None else [] - - # Generate missing information - self.generate() - - def __repr__(self): - s = "" - for n in self.nodes: - s += "\n" + str(n) - - return s - - def add_node(self, node): - """ - Adds a node to the experiment. - - :param node: A node. - """ - self.nodes.append(node) - self.generate() - - def del_node(self, node): - """ - Deletes a node from the experiment. - - :param node: A node. - """ - self.nodes.remove(node) - self.generate() - - # Compute registration/enrollment order for DIFs - def compute_dif_ordering(self): - # Compute DIFs dependency graph, as both adjacency and incidence list. - difsdeps_adj = dict() - difsdeps_inc = dict() - - for node in self.nodes: - for dif in node.difs: - if dif not in difsdeps_adj: - difsdeps_adj[dif] = set() - - for upper in node.dif_registrations: - for lower in node.dif_registrations[upper]: - if upper not in difsdeps_inc: - difsdeps_inc[upper] = set() - if lower not in difsdeps_inc: - difsdeps_inc[lower] = set() - if upper not in difsdeps_adj: - difsdeps_adj[upper] = set() - if lower not in difsdeps_adj: - difsdeps_adj[lower] = set() - difsdeps_inc[upper].add(lower) - difsdeps_adj[lower].add(upper) - - # Kahn's algorithm below only needs per-node count of - # incident edges, so we compute these counts from the - # incidence list and drop the latter. - difsdeps_inc_cnt = dict() - for dif in difsdeps_inc: - difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif]) - del difsdeps_inc - - # Init difsdeps_inc_cnt for those DIFs that do not - # act as lower IPCPs nor upper IPCPs for registration - # operations - for node in self.nodes: - for dif in node.difs: - if dif not in difsdeps_inc_cnt: - difsdeps_inc_cnt[dif] = 0 - - # Run Kahn's algorithm to compute topological - # ordering on the DIFs graph. - frontier = set() - self.dif_ordering = [] - for dif in difsdeps_inc_cnt: - if difsdeps_inc_cnt[dif] == 0: - frontier.add(dif) - - while len(frontier): - cur = frontier.pop() - self.dif_ordering.append(cur) - for nxt in difsdeps_adj[cur]: - difsdeps_inc_cnt[nxt] -= 1 - if difsdeps_inc_cnt[nxt] == 0: - frontier.add(nxt) - difsdeps_adj[cur] = set() - - circular_set = [dif for dif in difsdeps_inc_cnt - if difsdeps_inc_cnt[dif] != 0] - if len(circular_set): - raise Exception("Fatal error: The specified DIFs topology" - "has one or more" - "circular dependencies, involving the following" - " DIFs: %s" % circular_set) - - logger.debug("DIF topological ordering: %s", self.dif_ordering) - - # Compute all the enrollments, to be called after compute_dif_ordering() - def compute_enrollments(self): - dif_graphs = dict() - self.enrollments = [] - self.mgmt_flows = [] - self.dt_flows = [] - - for dif in self.dif_ordering: - neighsets = dict() - dif_graphs[dif] = dict() - first = None - - # For each N-1-DIF supporting this DIF, compute the set of nodes - # that share such N-1-DIF. This set will be called the 'neighset' of - # the N-1-DIF for the current DIF. - - for node in self.nodes: - if dif in node.dif_registrations: - dif_graphs[dif][node] = [] # init for later use - if first is None: # pick any node for later use - first = node - for lower_dif in node.dif_registrations[dif]: - if lower_dif not in neighsets: - neighsets[lower_dif] = [] - neighsets[lower_dif].append(node) - - # Build the graph, represented as adjacency list - for lower_dif in neighsets: - # Each neighset corresponds to a complete (sub)graph. - for node1 in neighsets[lower_dif]: - for node2 in neighsets[lower_dif]: - if node1 != node2: - dif_graphs[dif][node1].append((node2, lower_dif)) - - self.enrollments.append([]) - self.dt_flows.append([]) - self.mgmt_flows.append([]) - - if first is None: - # This is a shim DIF, nothing to do - continue - - er = [] - for node in dif_graphs[dif]: - for edge in dif_graphs[dif][node]: - er.append("%s --[%s]--> %s" % (node.name, - edge[1].name, - edge[0].name)) - logger.debug("DIF graph for %s: %s", dif, ', '.join(er)) - - # To generate the list of mgmt flows, minimal enrollments - # and minimal dt flows, we simulate it, using - # breadth-first traversal. - enrolled = {first} - frontier = {first} - while len(frontier): - cur = frontier.pop() - for edge in dif_graphs[dif][cur]: - if edge[0] not in enrolled: - enrolled.add(edge[0]) - enrollee = edge[0].get_ipcp_by_dif(dif) - assert(enrollee is not None) - enroller = cur.get_ipcp_by_dif(dif) - assert(enroller is not None) - if self.enrollment_strategy == 'minimal': - self.enrollments[-1].append({'dif': dif, - 'enrollee': enrollee, - 'enroller': enroller, - 'lower_dif': edge[1]}) - self.mgmt_flows[-1].append({'src': enrollee, - 'dst': enroller}) - if self.dt_strategy == 'minimal': - self.dt_flows[-1].append({'src': enrollee, - 'dst': enroller}) - frontier.add(edge[0]) - if len(dif.members) != len(enrolled): - raise Exception("Disconnected DIF found: %s" % (dif,)) - - # In case of a full mesh enrollment or dt flows - for cur in dif_graphs[dif]: - for edge in dif_graphs[dif][cur]: - if cur.name < edge[0].name: - enrollee = cur.get_ipcp_by_dif(dif) - assert(enrollee is not None) - enroller = edge[0].get_ipcp_by_dif(dif) - assert(enroller is not None) - if self.enrollment_strategy == 'full-mesh': - self.enrollments[-1].append({'dif': dif, - 'enrollee': enrollee, - 'enroller': enroller, - 'lower_dif': edge[1]}) - if self.dt_strategy == 'full-mesh': - self.dt_flows[-1].append({'src': enrollee, - 'dst': enroller}) - - if not (self.dt_strategy == 'minimal' - or self.dt_strategy == 'full-mesh') \ - or not (self.enrollment_strategy == 'full-mesh' - or self.enrollment_strategy == 'minimal'): - # This is a bug - assert False - - log_string = "Enrollments:\n" - for el in self.enrollments: - for e in el: - log_string += (" [%s] %s --> %s through N-1-DIF %s\n" - % (e['dif'], - e['enrollee'].name, - e['enroller'].name, - e['lower_dif'])) - logger.debug(log_string) - - log_string = "Mgmt flows:\n" - for el in self.mgmt_flows: - for e in el: - log_string += (" %s --> %s \n" - % (e['src'].name, - e['dst'].name)) - logger.debug(log_string) - - log_string = "Dt flows:\n" - for el in self.dt_flows: - for e in el: - log_string += (" %s --> %s \n" - % (e['src'].name, - e['dst'].name)) - logger.debug(log_string) - - def compute_ipcps(self): - # For each node, compute the required IPCP instances, and associated - # registrations - for node in self.nodes: - node.ipcps = [] - # We want also the node.ipcps list to be generated in - # topological ordering - for dif in self.dif_ordering: - if dif not in node.difs: - continue - - # Create an instance of the required IPCP class - ipcp = dif.get_ipcp_class()( - name='%s.%s' % (dif.name, node.name), - node=node, dif=dif) - - if dif in node.dif_registrations: - for lower in node.dif_registrations[dif]: - ipcp.registrations.append(lower) - - node.ipcps.append(ipcp) - dif.ipcps.append(ipcp) - - def compute_bootstrappers(self): - for node in self.nodes: - for ipcp in node.ipcps: - ipcp.dif_bootstrapper = True - for el in self.enrollments: - for e in el: - if e['dif'] != ipcp.dif: - # Skip this DIF - break - if e['enrollee'] == ipcp: - ipcp.dif_bootstrapper = False - # Exit the loops - break - if not ipcp.dif_bootstrapper: - break - - def dump_ssh_info(self): - f = open(os.path.join(tmp_dir, 'ssh_info'), 'w') - for node in self.nodes: - f.write("%s;%s;%s;%s;%s\n" % (node.name, - self.testbed.username, - node.ssh_config.hostname, - node.ssh_config.port, - node.ssh_config.proxy_server)) - f.close() - - # Examine the nodes and DIFs, compute the registration and enrollment - # order, the list of IPCPs to create, registrations, ... - def generate(self): - start = time.time() - self.compute_dif_ordering() - self.compute_ipcps() - self.compute_enrollments() - self.compute_bootstrappers() - for node in self.nodes: - logger.info("IPCPs for node %s: %s", node.name, node.ipcps) - end = time.time() - logger.info("Layer ordering computation took %.2f seconds", end - start) - - def install_prototype(self): - """ - Installs the prototype on the nodes. - """ - start = time.time() - self._install_prototype() - end = time.time() - logger.info("Install took %.2f seconds", end - start) - - def set_startup_command(self, command): - for node in self.nodes: - node.startup_command = command - - def bootstrap_prototype(self): - """ - Bootstraps the prototype on the nodes. - """ - start = time.time() - self._bootstrap_prototype() - end = time.time() - logger.info("Bootstrap took %.2f seconds", end - start) - - @abc.abstractmethod - def _install_prototype(self): - raise Exception('install_prototype() method not implemented') - - @abc.abstractmethod - def _bootstrap_prototype(self): - raise Exception('bootstrap_prototype() method not implemented') - - @abc.abstractmethod - def prototype_name(self): - raise Exception('prototype_name() method not implemented') - - @abc.abstractmethod - def _terminate_prototype(self): - raise Exception('terminate_prototype() method not implemented') - - def swap_in(self): - """ - Swap the experiment in on the testbed. - """ - start = time.time() - self.testbed.swap_in(self) - self.dump_ssh_info() - end = time.time() - logger.info("Swap-in took %.2f seconds", end - start) - - def swap_out(self): - """ - Swap the experiment out of the testbed. - """ - start = time.time() - # Terminate prototype gracefully - self._terminate_prototype() - for node in self.nodes: - if node.ssh_config.client is not None: - node.ssh_config.client.close() - if node.ssh_config.proxy_client is not None: - node.ssh_config.proxy_client.close() - # Undo the testbed (testbed-specific) - self.testbed.swap_out(self) - end = time.time() - logger.info("Swap-out took %.2f seconds", end - start) - - -class Executor: - __metaclass__ = abc.ABCMeta - - @abc.abstractmethod - def execute_command(self, node, command, as_root=False, time_out=3): - # Execute command on a node - return - - def execute_commands(self, node, commands, as_root=False, time_out=3): - for command in commands: - self.execute_command(node, command, as_root, time_out) - - @abc.abstractmethod - def copy_file(self, node, path, destination): - return - - def copy_files(self, node, paths, destination): - for path in paths: - self.copy_file(node, path, destination) - - @abc.abstractmethod - def fetch_file(self, node, path, destination, sudo=False): - return - - def fetch_files(self, node, paths, destination, sudo=False): - for path in paths: - self.fetch_file(node, path, destination, sudo) diff --git a/setup.py b/setup.py index 2ac0b2f..77661a2 100755 --- a/setup.py +++ b/setup.py @@ -16,7 +16,13 @@ setuptools.setup( author_email='sander.vrijders@ugent.be', license='LGPL', description='Rumba measurement framework for RINA', - packages=['rumba', 'rumba.testbeds', 'rumba.prototypes', 'rumba.executors'], + packages=[ + 'rumba', + 'rumba.testbeds', + 'rumba.prototypes', + 'rumba.executors', + 'rumba.elements' + ], install_requires=[ 'paramiko', 'docker', -- cgit v1.2.3