aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2017-03-21 10:31:53 +0000
committerSander Vrijders <sander.vrijders@intec.ugent.be>2017-03-21 10:31:53 +0000
commit1b539c74fb11caa327087782d5660270daa5d135 (patch)
treef8980eb62ea31d99b36709d92983e189ad282227
parentf5138ef9dffa1ce22a71a07724307b884c9dbaf5 (diff)
parent79a6808c292c4690c92314468ff4ff0d5d8ad72e (diff)
downloadrumba-1b539c74fb11caa327087782d5660270daa5d135.tar.gz
rumba-1b539c74fb11caa327087782d5660270daa5d135.zip
Merge branch 'vincenzo' into 'master'
Import various graph algorithms from the demonstrator See merge request !11
-rw-r--r--README28
-rwxr-xr-xexamples/example.py1
-rwxr-xr-xexamples/two-layers.py52
-rw-r--r--rumba/model.py326
-rw-r--r--rumba/testbeds/qemu.py34
5 files changed, 423 insertions, 18 deletions
diff --git a/README b/README
index 04b2623..cb29f09 100644
--- a/README
+++ b/README
@@ -1,2 +1,30 @@
# measurement-framework
Part of ARCFIRE 2020, WP3 work package.
+
+Workflow, both external and internal:
+
+ (1) user defines the network graph, creating instances of model.Node
+ and model.DIF classes
+
+ (2) user creates an instance of a Testbed class
+
+ (3) user creates an instance of prototype.Experiment class, passing
+ the testbed instance and a list of Node instances
+ - at the endo of the base Experiment constructor, the
+ generate function is called to generate information about
+ per-node IPCPs, registrations and enrollment, ready to be
+ used by the plugins
+
+ (4) user calls run() on the prototype.Experiment instance:
+ - First, run() calls Experiment.swap_in(), which
+ in turns calls Testbed.create_experiment(), passing the
+ nodes and links (?)
+ TODO: fix this interface: what should swap_in(), and
+ so create_experiment() return exactly? Current interface
+ seems broken
+
+ - Second, run() calls a prototype-specific setup function,
+ to create the required IPCPs, perform registrations,
+ enrollments, etc.
+
+ - Third, perform tests (TODO)
diff --git a/examples/example.py b/examples/example.py
index 2222ee0..e173ef0 100755
--- a/examples/example.py
+++ b/examples/example.py
@@ -8,6 +8,7 @@ from rumba.model import *
import rumba.testbeds.emulab as emulab
import rumba.testbeds.jfed as jfed
import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.qemu as qemu
# import prototype plugins
import rumba.prototypes.ouroboros as our
diff --git a/examples/two-layers.py b/examples/two-layers.py
new file mode 100755
index 0000000..29faebe
--- /dev/null
+++ b/examples/two-layers.py
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+
+# An example script using the rumba package
+
+from rumba.model import *
+
+# import testbed plugins
+import rumba.testbeds.emulab as emulab
+import rumba.testbeds.jfed as jfed
+import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.qemu as qemu
+
+# import prototype plugins
+import rumba.prototypes.ouroboros as our
+import rumba.prototypes.rlite as rl
+import rumba.prototypes.irati as irati
+
+n1 = NormalDIF("n1")
+n2 = NormalDIF("n2")
+n3 = NormalDIF("n3")
+n4 = NormalDIF("n4")
+
+e1 = ShimEthDIF("e1")
+e2 = ShimEthDIF("e2")
+e3 = ShimEthDIF("e3")
+
+a = Node("a",
+ difs = [n3, n4, n1, e1],
+ dif_registrations = {n4: [n1], n3: [n1], n1 : [e1]},
+ registrations = {"rinaperf.server" : [n3]},
+ bindings = {"rinaperf.server" : "/usr/bin/rinaperf"})
+
+b = Node("b",
+ difs = [n1, e1, e2],
+ dif_registrations = {n1 : [e1, e2]})
+
+c = Node("c",
+ difs = [n3, n4, n1, n2, e2, e3],
+ dif_registrations = {n4: [n1], n3: [n1, n2], n1 : [e2], n2: [e3]})
+
+d = Node("d",
+ difs = [n3, n2, e3],
+ dif_registrations = {n3: [n2], n2 : [e3]})
+
+tb = qemu.Testbed(exp_name = "twolayers",
+ username = "vmaffio")
+
+exp = irati.Experiment(tb, nodes = [a, b, c, d])
+
+print(exp)
+
+exp.run()
diff --git a/rumba/model.py b/rumba/model.py
index 23db86f..6d6ffee 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -50,6 +50,9 @@ class Interface:
self.name = name
self.ip = ip
+ def __repr__(self):
+ return self.name
+
# Represents a link in the physical graph
#
# @name [string] Link name
@@ -58,6 +61,9 @@ class Link:
def __init__(self, name):
self.name = name
+ def __repr__(self):
+ return self.name
+
# Represents a point-to-point link in the physical graph
#
# @name [string] DIF name
@@ -76,6 +82,10 @@ class P2PLink(Link):
int_b = Interface()
self.int_b = int_b
+ def __repr__(self):
+ return '%s:%s--%s:%s' % (self.node_a.name, self.int_a,
+ self.node_b.name, self.int_b)
+
# Base class for DIFs
#
# @name [string] DIF name
@@ -91,18 +101,33 @@ class DIF:
s = "DIF %s" % self.name
return s
+ def __hash__(self):
+ return hash(self.name)
+
+ def __eq__(self, other):
+ return other != None and self.name == other.name
+
+ def __neq__(self, other):
+ return not (self == other)
+
def add_member(self, node):
self.members.append(node)
def del_member(self, node):
self.members.remove(node)
+ def get_ipcp_class(self):
+ return IPCP
+
# Shim over UDP
#
class ShimUDPDIF(DIF):
def __init__(self, name, members = None):
DIF.__init__(self, name, members)
+ def get_ipcp_class(self):
+ return ShimUDPIPCP
+
# Shim over Ethernet
#
# @link_speed [int] Speed of the Ethernet network, in Mbps
@@ -114,6 +139,9 @@ class ShimEthDIF(DIF):
if self.link_speed < 0:
raise ValueError("link_speed must be a non-negative number")
+ def get_ipcp_class(self):
+ return ShimEthIPCP
+
# Normal DIF
#
# @policies [dict] Policies of the normal DIF
@@ -165,58 +193,144 @@ class Node:
bindings = dict()
self.bindings = bindings
self.full_name = name
+ self.ipcps = []
+
+ self._validate()
+
+ def _undeclared_dif(self, dif):
+ if dif not in self.difs:
+ raise Exception("Invalid registration: node %s is not declared "\
+ "to be part of DIF %s" % (self.name, dif.name))
+
+ def _validate(self):
+ # Check that DIFs referenced in self.dif_registrations and
+ # in self.registrations are part of self.difs
+ for upper in self.dif_registrations:
+ self._undeclared_dif(upper)
+ for lower in self.dif_registrations[upper]:
+ self._undeclared_dif(lower)
+
+ for appl in self.registrations:
+ for dif in self.registrations[appl]:
+ self._undeclared_dif(dif)
def __repr__(self):
s = "Node " + self.name + ":\n"
- s += " IPCPs in DIFs: ["
- for d in self.difs:
- s += " %s" % d.name
+
+ s += " DIFs: [ "
+ s += " ".join([d.name for d in self.difs])
s += " ]\n"
+
s += " DIF registrations: [ "
- for dif_a, difs in self.dif_registrations.items():
- s += "%s => [" % dif_a.name
- for dif_b in difs:
- s += " %s" % dif_b.name
- s += " ]"
+ rl = []
+ for upper in self.dif_registrations:
+ difs = self.dif_registrations[upper]
+ x = "%s => [" % upper.name
+ x += " ".join([lower.name for lower in difs])
+ x += "]"
+ rl.append(x)
+ s += ", ".join(rl)
s += " ]\n"
+
s += " Name registrations: [ "
- for name, difs in self.registrations.items():
- s += "%s => [" % name
- for dif in difs:
- s += " %s" % dif.name
+ for name in self.registrations:
+ difs = self.registrations[name]
+ s += "%s => [ " % name
+ s += ", ".join([dif.name for dif in difs])
s += " ]"
s += " ]\n"
+
s += " Bindings: [ "
- for ap, name in self.bindings.items():
- s += "'%s' => '%s'" % (ap, name)
+ s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap]) \
+ for ap in self.bindings])
s += " ]\n"
+
return s
+ def __hash__(self):
+ return hash(self.name)
+
+ def __eq__(self, other):
+ return other != None and self.name == other.name
+
+ def __neq__(self, other):
+ return not (self == other)
+
def add_dif(self, dif):
self.difs.append(dif)
dif.add_member(self)
+ self._validate()
def del_dif(self, dif):
self.difs.remove(dif)
dif.del_member(self)
+ self._validate()
- def add_dif_registration(self, dif_a, dif_b):
- self.dif_registrations[dif_a].append(dif_b)
+ def add_dif_registration(self, upper, lower):
+ self.dif_registrations[upper].append(lower)
+ self._validate()
- def del_dif_registration(self, dif_a, dif_b):
- self.dif_registrations[dif_a].remove(dif_b)
+ def del_dif_registration(self, upper, lower):
+ self.dif_registrations[upper].remove(lower)
+ self._validate()
def add_registration(self, name, dif):
self.dif_registrations[name].append(dif)
+ self._validate()
def del_registration(self, name, dif):
self.dif_registrations[name].remove(dif)
+ self._validate()
def add_binding(self, name, ap):
self.dif_bindings[name] = ap
+ self._validate()
def del_binding(self, name):
del self.dif_bindings[name]
+ self._validate()
+
+# Base class representing an IPC Process to be created in the experiment
+#
+# @name [string]: IPCP name
+# @node: Node where the IPCP gets created
+# @dif: the DIF the IPCP belongs to
+#
+class IPCP:
+ def __init__(self, name, node, dif):
+ self.name = name
+ self.node = node
+ self.dif = dif
+ self.registrations = []
+ self.enrollments = []
+
+ def __repr__(self):
+ return "{IPCP=%s,DIF=%s,N-1-DIFs=(%s),enrollments=(%s)}" % \
+ (self.name, self.dif.name,
+ ' '.join([dif.name for dif in self.registrations]),
+ ' '.join(['{neigh=%s,N-1-DIF=%s}' % (e['enroller'].name,
+ e['lower_dif'].name) for e in self.enrollments])
+ )
+
+ def __hash__(self):
+ return hash((self.name, self.dif.name))
+
+ def __eq__(self, other):
+ return other != None and self.name == other.name \
+ and self.dif == other.dif
+
+ def __neq__(self, other):
+ return not (self == other)
+
+class ShimEthIPCP(IPCP):
+ def __init__(self, name, node, dif, ifname = None):
+ IPCP.__init__(self, name, node, dif)
+ self.ifname = ifname
+
+class ShimUDPIPCP(IPCP):
+ def __init__(self, name, node, dif):
+ IPCP.__init__(self, name, node, dif)
+ # TODO add IP and port
# Base class for ARCFIRE experiments
#
@@ -229,6 +343,12 @@ class Experiment:
nodes = list()
self.nodes = nodes
self.testbed = testbed
+ self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual'
+ self.dif_ordering = []
+ self.enrollments = dict()
+
+ # Generate missing information
+ self.generate()
def __repr__(self):
s = ""
@@ -258,9 +378,179 @@ class Experiment:
def add_node(self, node):
self.nodes.append(node)
+ self.generate()
def del_node(self, node):
self.nodes.remove(node)
+ self.generate()
+
+ # Compute registration/enrollment order for DIFs
+ def compute_dif_ordering(self):
+ # Compute DIFs dependency graph, as both adjacency and incidence list.
+ difsdeps_adj = dict()
+ difsdeps_inc = dict()
+
+ for node in self.nodes:
+ for upper in node.dif_registrations:
+ for lower in node.dif_registrations[upper]:
+ if upper not in difsdeps_inc:
+ difsdeps_inc[upper] = set()
+ if lower not in difsdeps_inc:
+ difsdeps_inc[lower] = set()
+ if upper not in difsdeps_adj:
+ difsdeps_adj[upper] = set()
+ if lower not in difsdeps_adj:
+ difsdeps_adj[lower] = set()
+ difsdeps_inc[upper].add(lower)
+ difsdeps_adj[lower].add(upper)
+
+ # Kahn's algorithm below only needs per-node count of
+ # incident edges, so we compute these counts from the
+ # incidence list and drop the latter.
+ difsdeps_inc_cnt = dict()
+ for dif in difsdeps_inc:
+ difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif])
+ del difsdeps_inc
+
+ #print(difsdeps_adj)
+ #print(difsdeps_inc_cnt)
+
+ # Run Kahn's algorithm to compute topological ordering on the DIFs graph.
+ frontier = set()
+ self.dif_ordering = []
+ for dif in difsdeps_inc_cnt:
+ if difsdeps_inc_cnt[dif] == 0:
+ frontier.add(dif)
+
+ while len(frontier):
+ cur = frontier.pop()
+ self.dif_ordering.append(cur)
+ for nxt in difsdeps_adj[cur]:
+ difsdeps_inc_cnt[nxt] -= 1
+ if difsdeps_inc_cnt[nxt] == 0:
+ frontier.add(nxt)
+ difsdeps_adj[cur] = set()
+
+ circular_set = [dif for dif in difsdeps_inc_cnt if difsdeps_inc_cnt[dif] != 0]
+ if len(circular_set):
+ raise Exception("Fatal error: The specified DIFs topology has one or more"\
+ "circular dependencies, involving the following"\
+ " DIFs: %s" % circular_set)
+
+ print("DIF topological ordering: %s" % self.dif_ordering)
+
+ # Compute per-DIF enrollments, to be called after compute_dif_ordering()
+ def compute_enrollments(self):
+ dif_graphs = dict()
+ self.enrollments = dict()
+
+ for dif in self.dif_ordering:
+ neighsets = dict()
+ dif_graphs[dif] = dict()
+ first = None
+
+ # For each N-1-DIF supporting this DIF, compute the set of nodes that
+ # share such N-1-DIF. This set will be called the 'neighset' of
+ # the N-1-DIF for the current DIF.
+
+ for node in self.nodes:
+ if dif in node.dif_registrations:
+ dif_graphs[dif][node] = [] # init for later use
+ if first == None: # pick any node for later use
+ first = node
+ for lower_dif in node.dif_registrations[dif]:
+ if lower_dif not in neighsets:
+ neighsets[lower_dif] = []
+ neighsets[lower_dif].append(node)
+
+ # Build the graph, represented as adjacency list
+ for lower_dif in neighsets:
+ # Each neighset corresponds to a complete (sub)graph.
+ for node1 in neighsets[lower_dif]:
+ for node2 in neighsets[lower_dif]:
+ if node1 != node2:
+ dif_graphs[dif][node1].append((node2, lower_dif))
+
+ self.enrollments[dif] = []
+
+ if first == None:
+ # This is a shim DIF, nothing to do
+ continue
+
+ er = []
+ for node in dif_graphs[dif]:
+ for edge in dif_graphs[dif][node]:
+ er.append("%s --[%s]--> %s" % (node.name, edge[1].name, edge[0].name))
+ print("DIF graph for %s: %s" % (dif, ', '.join(er)))
+
+ if self.enrollment_strategy == 'minimal':
+ # To generate the list of enrollments, we simulate one,
+ # using breadth-first trasversal.
+ enrolled = set([first])
+ frontier = set([first])
+ while len(frontier):
+ cur = frontier.pop()
+ for edge in dif_graphs[dif][cur]:
+ if edge[0] not in enrolled:
+ enrolled.add(edge[0])
+ self.enrollments[dif].append({'enrollee': edge[0],
+ 'enroller': cur,
+ 'lower_dif': edge[1]})
+ frontier.add(edge[0])
+
+ elif self.enrollment_strategy == 'full-mesh':
+ for cur in dif_graphs[dif]:
+ for edge in dif_graphs[dif][cur]:
+ if cur < edge[0]:
+ self.enrollments[dif].append({'enrollee': cur,
+ 'enroller': edge[0],
+ 'lower_dif': edge[1]})
+
+ else:
+ # This is a bug
+ assert(False)
+
+ print("Enrollments for %s" % dif)
+ for e in self.enrollments[dif]:
+ print(" %s --> %s through N-1-DIF %s" % \
+ (e['enrollee'].name,
+ e['enroller'].name,
+ e['lower_dif']))
+
+ def compute_ipcps(self):
+ # For each node, compute the required IPCP instances, and associated
+ # registrations and enrollments
+ for node in self.nodes:
+ node.ipcps = []
+ # We want also the node.ipcps list to be generated in
+ # topological ordering
+ for dif in self.dif_ordering:
+ if dif not in node.difs:
+ continue
+
+ ipcp = dif.get_ipcp_class()(
+ name = '%s.%s' % (dif.name, node.name),
+ node = node, dif = dif)
+
+ if dif in node.dif_registrations:
+ for lower in node.dif_registrations[dif]:
+ ipcp.registrations.append(lower)
+
+ for e in self.enrollments[dif]:
+ if e['enrollee'] == node:
+ ipcp.enrollments.append({'enroller': e['enroller'],
+ 'lower_dif': e['lower_dif']})
+
+ node.ipcps.append(ipcp)
+
+ print("IPCP for node %s: %s" % (node.name, node.ipcps))
+
+ # Examine the nodes and DIFs, compute the registration and enrollment
+ # order, the list of IPCPs to create, registrations, ...
+ def generate(self):
+ self.compute_dif_ordering()
+ self.compute_enrollments()
+ self.compute_ipcps()
# Realize the experiment, using a testbed-specific setup
def swap_in(self):
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
new file mode 100644
index 0000000..d44bb3e
--- /dev/null
+++ b/rumba/testbeds/qemu.py
@@ -0,0 +1,34 @@
+#
+# QEMU testbed for Rumba
+#
+# Vincenzo Maffione <v.maffione@nextworks.it>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+# MA 02110-1301 USA
+
+import rumba.model as mod
+
+# Fake testbed, useful for testing
+class Testbed(mod.Testbed):
+ def __init__(self, exp_name, username, proj_name = "ARCFIRE",
+ password = ""):
+ mod.Testbed.__init__(self, exp_name, username, password, proj_name)
+
+ def create_experiment(self, nodes, links):
+ print(links)
+ print("[QEMU testbed] experiment swapped in")
+
+ def __del__(self):
+ pass