diff options
-rw-r--r-- | README | 28 | ||||
-rwxr-xr-x | examples/example.py | 1 | ||||
-rwxr-xr-x | examples/two-layers.py | 52 | ||||
-rw-r--r-- | rumba/model.py | 326 | ||||
-rw-r--r-- | rumba/testbeds/qemu.py | 34 |
5 files changed, 423 insertions, 18 deletions
@@ -1,2 +1,30 @@ # measurement-framework Part of ARCFIRE 2020, WP3 work package. + +Workflow, both external and internal: + + (1) user defines the network graph, creating instances of model.Node + and model.DIF classes + + (2) user creates an instance of a Testbed class + + (3) user creates an instance of prototype.Experiment class, passing + the testbed instance and a list of Node instances + - at the endo of the base Experiment constructor, the + generate function is called to generate information about + per-node IPCPs, registrations and enrollment, ready to be + used by the plugins + + (4) user calls run() on the prototype.Experiment instance: + - First, run() calls Experiment.swap_in(), which + in turns calls Testbed.create_experiment(), passing the + nodes and links (?) + TODO: fix this interface: what should swap_in(), and + so create_experiment() return exactly? Current interface + seems broken + + - Second, run() calls a prototype-specific setup function, + to create the required IPCPs, perform registrations, + enrollments, etc. + + - Third, perform tests (TODO) diff --git a/examples/example.py b/examples/example.py index 2222ee0..e173ef0 100755 --- a/examples/example.py +++ b/examples/example.py @@ -8,6 +8,7 @@ from rumba.model import * import rumba.testbeds.emulab as emulab import rumba.testbeds.jfed as jfed import rumba.testbeds.faketestbed as fake +import rumba.testbeds.qemu as qemu # import prototype plugins import rumba.prototypes.ouroboros as our diff --git a/examples/two-layers.py b/examples/two-layers.py new file mode 100755 index 0000000..29faebe --- /dev/null +++ b/examples/two-layers.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +# An example script using the rumba package + +from rumba.model import * + +# import testbed plugins +import rumba.testbeds.emulab as emulab +import rumba.testbeds.jfed as jfed +import rumba.testbeds.faketestbed as fake +import rumba.testbeds.qemu as qemu + +# import prototype plugins +import rumba.prototypes.ouroboros as our +import rumba.prototypes.rlite as rl +import rumba.prototypes.irati as irati + +n1 = NormalDIF("n1") +n2 = NormalDIF("n2") +n3 = NormalDIF("n3") +n4 = NormalDIF("n4") + +e1 = ShimEthDIF("e1") +e2 = ShimEthDIF("e2") +e3 = ShimEthDIF("e3") + +a = Node("a", + difs = [n3, n4, n1, e1], + dif_registrations = {n4: [n1], n3: [n1], n1 : [e1]}, + registrations = {"rinaperf.server" : [n3]}, + bindings = {"rinaperf.server" : "/usr/bin/rinaperf"}) + +b = Node("b", + difs = [n1, e1, e2], + dif_registrations = {n1 : [e1, e2]}) + +c = Node("c", + difs = [n3, n4, n1, n2, e2, e3], + dif_registrations = {n4: [n1], n3: [n1, n2], n1 : [e2], n2: [e3]}) + +d = Node("d", + difs = [n3, n2, e3], + dif_registrations = {n3: [n2], n2 : [e3]}) + +tb = qemu.Testbed(exp_name = "twolayers", + username = "vmaffio") + +exp = irati.Experiment(tb, nodes = [a, b, c, d]) + +print(exp) + +exp.run() diff --git a/rumba/model.py b/rumba/model.py index 23db86f..6d6ffee 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -50,6 +50,9 @@ class Interface: self.name = name self.ip = ip + def __repr__(self): + return self.name + # Represents a link in the physical graph # # @name [string] Link name @@ -58,6 +61,9 @@ class Link: def __init__(self, name): self.name = name + def __repr__(self): + return self.name + # Represents a point-to-point link in the physical graph # # @name [string] DIF name @@ -76,6 +82,10 @@ class P2PLink(Link): int_b = Interface() self.int_b = int_b + def __repr__(self): + return '%s:%s--%s:%s' % (self.node_a.name, self.int_a, + self.node_b.name, self.int_b) + # Base class for DIFs # # @name [string] DIF name @@ -91,18 +101,33 @@ class DIF: s = "DIF %s" % self.name return s + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return other != None and self.name == other.name + + def __neq__(self, other): + return not (self == other) + def add_member(self, node): self.members.append(node) def del_member(self, node): self.members.remove(node) + def get_ipcp_class(self): + return IPCP + # Shim over UDP # class ShimUDPDIF(DIF): def __init__(self, name, members = None): DIF.__init__(self, name, members) + def get_ipcp_class(self): + return ShimUDPIPCP + # Shim over Ethernet # # @link_speed [int] Speed of the Ethernet network, in Mbps @@ -114,6 +139,9 @@ class ShimEthDIF(DIF): if self.link_speed < 0: raise ValueError("link_speed must be a non-negative number") + def get_ipcp_class(self): + return ShimEthIPCP + # Normal DIF # # @policies [dict] Policies of the normal DIF @@ -165,58 +193,144 @@ class Node: bindings = dict() self.bindings = bindings self.full_name = name + self.ipcps = [] + + self._validate() + + def _undeclared_dif(self, dif): + if dif not in self.difs: + raise Exception("Invalid registration: node %s is not declared "\ + "to be part of DIF %s" % (self.name, dif.name)) + + def _validate(self): + # Check that DIFs referenced in self.dif_registrations and + # in self.registrations are part of self.difs + for upper in self.dif_registrations: + self._undeclared_dif(upper) + for lower in self.dif_registrations[upper]: + self._undeclared_dif(lower) + + for appl in self.registrations: + for dif in self.registrations[appl]: + self._undeclared_dif(dif) def __repr__(self): s = "Node " + self.name + ":\n" - s += " IPCPs in DIFs: [" - for d in self.difs: - s += " %s" % d.name + + s += " DIFs: [ " + s += " ".join([d.name for d in self.difs]) s += " ]\n" + s += " DIF registrations: [ " - for dif_a, difs in self.dif_registrations.items(): - s += "%s => [" % dif_a.name - for dif_b in difs: - s += " %s" % dif_b.name - s += " ]" + rl = [] + for upper in self.dif_registrations: + difs = self.dif_registrations[upper] + x = "%s => [" % upper.name + x += " ".join([lower.name for lower in difs]) + x += "]" + rl.append(x) + s += ", ".join(rl) s += " ]\n" + s += " Name registrations: [ " - for name, difs in self.registrations.items(): - s += "%s => [" % name - for dif in difs: - s += " %s" % dif.name + for name in self.registrations: + difs = self.registrations[name] + s += "%s => [ " % name + s += ", ".join([dif.name for dif in difs]) s += " ]" s += " ]\n" + s += " Bindings: [ " - for ap, name in self.bindings.items(): - s += "'%s' => '%s'" % (ap, name) + s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap]) \ + for ap in self.bindings]) s += " ]\n" + return s + def __hash__(self): + return hash(self.name) + + def __eq__(self, other): + return other != None and self.name == other.name + + def __neq__(self, other): + return not (self == other) + def add_dif(self, dif): self.difs.append(dif) dif.add_member(self) + self._validate() def del_dif(self, dif): self.difs.remove(dif) dif.del_member(self) + self._validate() - def add_dif_registration(self, dif_a, dif_b): - self.dif_registrations[dif_a].append(dif_b) + def add_dif_registration(self, upper, lower): + self.dif_registrations[upper].append(lower) + self._validate() - def del_dif_registration(self, dif_a, dif_b): - self.dif_registrations[dif_a].remove(dif_b) + def del_dif_registration(self, upper, lower): + self.dif_registrations[upper].remove(lower) + self._validate() def add_registration(self, name, dif): self.dif_registrations[name].append(dif) + self._validate() def del_registration(self, name, dif): self.dif_registrations[name].remove(dif) + self._validate() def add_binding(self, name, ap): self.dif_bindings[name] = ap + self._validate() def del_binding(self, name): del self.dif_bindings[name] + self._validate() + +# Base class representing an IPC Process to be created in the experiment +# +# @name [string]: IPCP name +# @node: Node where the IPCP gets created +# @dif: the DIF the IPCP belongs to +# +class IPCP: + def __init__(self, name, node, dif): + self.name = name + self.node = node + self.dif = dif + self.registrations = [] + self.enrollments = [] + + def __repr__(self): + return "{IPCP=%s,DIF=%s,N-1-DIFs=(%s),enrollments=(%s)}" % \ + (self.name, self.dif.name, + ' '.join([dif.name for dif in self.registrations]), + ' '.join(['{neigh=%s,N-1-DIF=%s}' % (e['enroller'].name, + e['lower_dif'].name) for e in self.enrollments]) + ) + + def __hash__(self): + return hash((self.name, self.dif.name)) + + def __eq__(self, other): + return other != None and self.name == other.name \ + and self.dif == other.dif + + def __neq__(self, other): + return not (self == other) + +class ShimEthIPCP(IPCP): + def __init__(self, name, node, dif, ifname = None): + IPCP.__init__(self, name, node, dif) + self.ifname = ifname + +class ShimUDPIPCP(IPCP): + def __init__(self, name, node, dif): + IPCP.__init__(self, name, node, dif) + # TODO add IP and port # Base class for ARCFIRE experiments # @@ -229,6 +343,12 @@ class Experiment: nodes = list() self.nodes = nodes self.testbed = testbed + self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual' + self.dif_ordering = [] + self.enrollments = dict() + + # Generate missing information + self.generate() def __repr__(self): s = "" @@ -258,9 +378,179 @@ class Experiment: def add_node(self, node): self.nodes.append(node) + self.generate() def del_node(self, node): self.nodes.remove(node) + self.generate() + + # Compute registration/enrollment order for DIFs + def compute_dif_ordering(self): + # Compute DIFs dependency graph, as both adjacency and incidence list. + difsdeps_adj = dict() + difsdeps_inc = dict() + + for node in self.nodes: + for upper in node.dif_registrations: + for lower in node.dif_registrations[upper]: + if upper not in difsdeps_inc: + difsdeps_inc[upper] = set() + if lower not in difsdeps_inc: + difsdeps_inc[lower] = set() + if upper not in difsdeps_adj: + difsdeps_adj[upper] = set() + if lower not in difsdeps_adj: + difsdeps_adj[lower] = set() + difsdeps_inc[upper].add(lower) + difsdeps_adj[lower].add(upper) + + # Kahn's algorithm below only needs per-node count of + # incident edges, so we compute these counts from the + # incidence list and drop the latter. + difsdeps_inc_cnt = dict() + for dif in difsdeps_inc: + difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif]) + del difsdeps_inc + + #print(difsdeps_adj) + #print(difsdeps_inc_cnt) + + # Run Kahn's algorithm to compute topological ordering on the DIFs graph. + frontier = set() + self.dif_ordering = [] + for dif in difsdeps_inc_cnt: + if difsdeps_inc_cnt[dif] == 0: + frontier.add(dif) + + while len(frontier): + cur = frontier.pop() + self.dif_ordering.append(cur) + for nxt in difsdeps_adj[cur]: + difsdeps_inc_cnt[nxt] -= 1 + if difsdeps_inc_cnt[nxt] == 0: + frontier.add(nxt) + difsdeps_adj[cur] = set() + + circular_set = [dif for dif in difsdeps_inc_cnt if difsdeps_inc_cnt[dif] != 0] + if len(circular_set): + raise Exception("Fatal error: The specified DIFs topology has one or more"\ + "circular dependencies, involving the following"\ + " DIFs: %s" % circular_set) + + print("DIF topological ordering: %s" % self.dif_ordering) + + # Compute per-DIF enrollments, to be called after compute_dif_ordering() + def compute_enrollments(self): + dif_graphs = dict() + self.enrollments = dict() + + for dif in self.dif_ordering: + neighsets = dict() + dif_graphs[dif] = dict() + first = None + + # For each N-1-DIF supporting this DIF, compute the set of nodes that + # share such N-1-DIF. This set will be called the 'neighset' of + # the N-1-DIF for the current DIF. + + for node in self.nodes: + if dif in node.dif_registrations: + dif_graphs[dif][node] = [] # init for later use + if first == None: # pick any node for later use + first = node + for lower_dif in node.dif_registrations[dif]: + if lower_dif not in neighsets: + neighsets[lower_dif] = [] + neighsets[lower_dif].append(node) + + # Build the graph, represented as adjacency list + for lower_dif in neighsets: + # Each neighset corresponds to a complete (sub)graph. + for node1 in neighsets[lower_dif]: + for node2 in neighsets[lower_dif]: + if node1 != node2: + dif_graphs[dif][node1].append((node2, lower_dif)) + + self.enrollments[dif] = [] + + if first == None: + # This is a shim DIF, nothing to do + continue + + er = [] + for node in dif_graphs[dif]: + for edge in dif_graphs[dif][node]: + er.append("%s --[%s]--> %s" % (node.name, edge[1].name, edge[0].name)) + print("DIF graph for %s: %s" % (dif, ', '.join(er))) + + if self.enrollment_strategy == 'minimal': + # To generate the list of enrollments, we simulate one, + # using breadth-first trasversal. + enrolled = set([first]) + frontier = set([first]) + while len(frontier): + cur = frontier.pop() + for edge in dif_graphs[dif][cur]: + if edge[0] not in enrolled: + enrolled.add(edge[0]) + self.enrollments[dif].append({'enrollee': edge[0], + 'enroller': cur, + 'lower_dif': edge[1]}) + frontier.add(edge[0]) + + elif self.enrollment_strategy == 'full-mesh': + for cur in dif_graphs[dif]: + for edge in dif_graphs[dif][cur]: + if cur < edge[0]: + self.enrollments[dif].append({'enrollee': cur, + 'enroller': edge[0], + 'lower_dif': edge[1]}) + + else: + # This is a bug + assert(False) + + print("Enrollments for %s" % dif) + for e in self.enrollments[dif]: + print(" %s --> %s through N-1-DIF %s" % \ + (e['enrollee'].name, + e['enroller'].name, + e['lower_dif'])) + + def compute_ipcps(self): + # For each node, compute the required IPCP instances, and associated + # registrations and enrollments + for node in self.nodes: + node.ipcps = [] + # We want also the node.ipcps list to be generated in + # topological ordering + for dif in self.dif_ordering: + if dif not in node.difs: + continue + + ipcp = dif.get_ipcp_class()( + name = '%s.%s' % (dif.name, node.name), + node = node, dif = dif) + + if dif in node.dif_registrations: + for lower in node.dif_registrations[dif]: + ipcp.registrations.append(lower) + + for e in self.enrollments[dif]: + if e['enrollee'] == node: + ipcp.enrollments.append({'enroller': e['enroller'], + 'lower_dif': e['lower_dif']}) + + node.ipcps.append(ipcp) + + print("IPCP for node %s: %s" % (node.name, node.ipcps)) + + # Examine the nodes and DIFs, compute the registration and enrollment + # order, the list of IPCPs to create, registrations, ... + def generate(self): + self.compute_dif_ordering() + self.compute_enrollments() + self.compute_ipcps() # Realize the experiment, using a testbed-specific setup def swap_in(self): diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py new file mode 100644 index 0000000..d44bb3e --- /dev/null +++ b/rumba/testbeds/qemu.py @@ -0,0 +1,34 @@ +# +# QEMU testbed for Rumba +# +# Vincenzo Maffione <v.maffione@nextworks.it> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, +# MA 02110-1301 USA + +import rumba.model as mod + +# Fake testbed, useful for testing +class Testbed(mod.Testbed): + def __init__(self, exp_name, username, proj_name = "ARCFIRE", + password = ""): + mod.Testbed.__init__(self, exp_name, username, password, proj_name) + + def create_experiment(self, nodes, links): + print(links) + print("[QEMU testbed] experiment swapped in") + + def __del__(self): + pass |