aboutsummaryrefslogtreecommitdiff
path: root/rumba
diff options
context:
space:
mode:
authorvmaffione <v.maffione@gmail.com>2017-04-15 07:34:21 +0000
committervmaffione <v.maffione@gmail.com>2017-04-15 07:34:21 +0000
commit28d6a8729fac3d109e68afed1bbacc27d526048b (patch)
treecacdb7db6e44712c0a4ca2dc617afb355bc2c852 /rumba
parentca1d77df271defb08d5f73b54398491d1049c9f9 (diff)
parent6eceae4bf7ee823d6eed276935741b7c107f6105 (diff)
downloadrumba-28d6a8729fac3d109e68afed1bbacc27d526048b.tar.gz
rumba-28d6a8729fac3d109e68afed1bbacc27d526048b.zip
Merge branch 'master-marco' into 'master'
IRATI config file generation (and other) See merge request !22
Diffstat (limited to 'rumba')
-rw-r--r--rumba/model.py53
-rwxr-xr-xrumba/prototypes/enroll.py117
-rw-r--r--rumba/prototypes/irati.py343
-rw-r--r--rumba/prototypes/irati_templates.py349
-rw-r--r--rumba/ssh_support.py96
-rw-r--r--rumba/testbeds/qemu.py150
6 files changed, 1021 insertions, 87 deletions
diff --git a/rumba/model.py b/rumba/model.py
index 9e6b40d..0d93fcd 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -20,7 +20,7 @@
# MA 02110-1301 USA
import abc
-import re
+
# Represents generic testbed info
#
@@ -65,7 +65,7 @@ class DIF:
return hash(self.name)
def __eq__(self, other):
- return other != None and self.name == other.name
+ return other is not None and self.name == other.name
def __neq__(self, other):
return not self == other
@@ -79,6 +79,7 @@ class DIF:
def get_ipcp_class(self):
return IPCP
+
# Shim over UDP
#
class ShimUDPDIF(DIF):
@@ -88,6 +89,7 @@ class ShimUDPDIF(DIF):
def get_ipcp_class(self):
return ShimUDPIPCP
+
# Shim over Ethernet
#
# @link_speed [int] Speed of the Ethernet network, in Mbps
@@ -102,6 +104,7 @@ class ShimEthDIF(DIF):
def get_ipcp_class(self):
return ShimEthIPCP
+
# Normal DIF
#
# @policies [dict] Policies of the normal DIF
@@ -125,6 +128,7 @@ class NormalDIF(DIF):
s += "\n Component %s has policy %s" % (comp, pol)
return s
+
# SSH Configuration
#
class SSHConfig:
@@ -133,6 +137,7 @@ class SSHConfig:
self.port = port
self.proxycommand = proxycommand
+
# A node in the experiment
#
# @difs: DIFs the node will have an IPCP in
@@ -165,7 +170,7 @@ class Node:
def _undeclared_dif(self, dif):
if dif not in self.difs:
- raise Exception("Invalid registration: node %s is not declared "\
+ raise Exception("Invalid registration: node %s is not declared "
"to be part of DIF %s" % (self.name, dif.name))
def _validate(self):
@@ -207,8 +212,8 @@ class Node:
s += " ]\n"
s += " Bindings: [ "
- s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap]) \
- for ap in self.bindings])
+ s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap])
+ for ap in self.bindings])
s += " ]\n"
return s
@@ -217,7 +222,7 @@ class Node:
return hash(self.name)
def __eq__(self, other):
- return other != None and self.name == other.name
+ return other is not None and self.name == other.name
def __neq__(self, other):
return not self == other
@@ -256,6 +261,7 @@ class Node:
del self.bindings[name]
self._validate()
+
# Base class representing an IPC Process to be created in the experiment
#
# @name [string]: IPCP name
@@ -278,28 +284,31 @@ class IPCP:
(self.name, self.dif.name,
' '.join([dif.name for dif in self.registrations]),
',bootstrapper' if self.dif_bootstrapper else ''
- )
+ )
def __hash__(self):
return hash((self.name, self.dif.name))
def __eq__(self, other):
- return other != None and self.name == other.name \
+ return other is not None and self.name == other.name \
and self.dif == other.dif
def __neq__(self, other):
return not self == other
+
class ShimEthIPCP(IPCP):
def __init__(self, name, node, dif, ifname=None):
IPCP.__init__(self, name, node, dif)
self.ifname = ifname
+
class ShimUDPIPCP(IPCP):
def __init__(self, name, node, dif):
IPCP.__init__(self, name, node, dif)
# TODO: add IP and port
+
# Base class for ARCFIRE experiments
#
# @name [string] Name of the experiment
@@ -313,7 +322,7 @@ class Experiment:
self.testbed = testbed
self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual'
self.dif_ordering = []
- self.enrollments = [] # a list of per-DIF lists of enrollments
+ self.enrollments = [] # a list of per-DIF lists of enrollments
# Generate missing information
self.generate()
@@ -361,8 +370,8 @@ class Experiment:
difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif])
del difsdeps_inc
- #print(difsdeps_adj)
- #print(difsdeps_inc_cnt)
+ # print(difsdeps_adj)
+ # print(difsdeps_inc_cnt)
# Run Kahn's algorithm to compute topological
# ordering on the DIFs graph.
@@ -381,12 +390,12 @@ class Experiment:
frontier.add(nxt)
difsdeps_adj[cur] = set()
- circular_set = [dif for dif in difsdeps_inc_cnt \
+ circular_set = [dif for dif in difsdeps_inc_cnt
if difsdeps_inc_cnt[dif] != 0]
if len(circular_set):
- raise Exception("Fatal error: The specified DIFs topology" \
- "has one or more" \
- "circular dependencies, involving the following" \
+ raise Exception("Fatal error: The specified DIFs topology"
+ "has one or more"
+ "circular dependencies, involving the following"
" DIFs: %s" % circular_set)
print("DIF topological ordering: %s" % self.dif_ordering)
@@ -407,8 +416,8 @@ class Experiment:
for node in self.nodes:
if dif in node.dif_registrations:
- dif_graphs[dif][node] = [] # init for later use
- if first is None: # pick any node for later use
+ dif_graphs[dif][node] = [] # init for later use
+ if first is None: # pick any node for later use
first = node
for lower_dif in node.dif_registrations[dif]:
if lower_dif not in neighsets:
@@ -469,11 +478,11 @@ class Experiment:
print("Enrollments:")
for el in self.enrollments:
for e in el:
- print(" [%s] %s --> %s through N-1-DIF %s" % \
- (e['dif'],
- e['enrollee'].name,
- e['enroller'].name,
- e['lower_dif']))
+ print(" [%s] %s --> %s through N-1-DIF %s" %
+ (e['dif'],
+ e['enrollee'].name,
+ e['enroller'].name,
+ e['lower_dif']))
def compute_ipcps(self):
# For each node, compute the required IPCP instances, and associated
diff --git a/rumba/prototypes/enroll.py b/rumba/prototypes/enroll.py
new file mode 100755
index 0000000..458736a
--- /dev/null
+++ b/rumba/prototypes/enroll.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+
+#
+# Author: Vincenzo Maffione <v.maffione@nextworks.it>
+#
+
+import argparse
+import socket
+import time
+import re
+
+def printalo(byt):
+ print(repr(byt).replace('\\n', '\n'))
+
+
+def get_response(s):
+ data = bytes()
+ while 1:
+ data += s.recv(1024)
+ lines = str(data).replace('\\n', '\n').split('\n')
+ #print(lines)
+ if lines[-1].find("IPCM") != -1:
+ return lines[:len(lines)-1]
+
+
+description = "Python script to enroll IPCPs"
+epilog = "2016 Vincenzo Maffione <v.maffione@nextworks.it>"
+
+argparser = argparse.ArgumentParser(description = description,
+ epilog = epilog)
+argparser.add_argument('--ipcm-conf', help = "Path to the IPCM configuration file",
+ type = str, required = True)
+argparser.add_argument('--enrollee-name', help = "Name of the enrolling IPCP",
+ type = str, required = True)
+argparser.add_argument('--dif', help = "Name of DIF to enroll to",
+ type = str, required = True)
+argparser.add_argument('--lower-dif', help = "Name of the lower level DIF",
+ type = str, required = True)
+argparser.add_argument('--enroller-name', help = "Name of the remote neighbor IPCP to enroll to",
+ type = str, required = True)
+args = argparser.parse_args()
+
+socket_name = None
+
+fin = open(args.ipcm_conf, 'r')
+while 1:
+ line = fin.readline()
+ if line == '':
+ break
+
+ m = re.search(r'"(\S+ipcm-console.sock)', line)
+ if m != None:
+ socket_name = m.group(1)
+ break
+fin.close()
+
+if socket_name == None:
+ print('Cannot find %s' % (socket_name))
+ quit(1)
+
+s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+
+connected = False
+trials = 0
+while trials < 4:
+ try:
+ s.connect(socket_name)
+ connected = True
+ break
+ except:
+ pass
+ trials += 1
+ time.sleep(1)
+
+if connected:
+ try:
+ # Receive the banner
+ get_response(s)
+
+ # Send the IPCP list command
+ cmd = 'list-ipcps\n'
+ s.sendall(bytes(cmd, 'ascii'))
+
+ # Get the list of IPCPs and parse it to look for the enroller ID
+ print('Looking up identifier for IPCP %s' % args.enrollee_name)
+ lines = get_response(s)
+ print(lines)
+ enrollee_id = None
+ for line in lines:
+ rs = r'^\s*(\d+)\s*\|\s*' + args.enrollee_name.replace('.', '\\.')
+ m = re.match(rs, line)
+ if m != None:
+ enrollee_id = m.group(1)
+
+ if enrollee_id == None:
+ print('Could not find the ID of enrollee IPCP %s' \
+ % args.enrollee_name)
+ raise Exception()
+
+ # Send the enroll command
+ cmd = 'enroll-to-dif %s %s %s %s 1\n' \
+ % (enrollee_id, args.dif, args.lower_dif, args.enroller_name)
+ print(cmd)
+
+ s.sendall(bytes(cmd, 'ascii'))
+
+ # Get the enroll command answer
+ lines = get_response(s)
+ print(lines)
+ except:
+ s.close()
+ raise
+
+else:
+ print('Failed to connect to "%s"' % socket_name)
+
+s.close()
diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py
index e8766da..89c4fe4 100644
--- a/rumba/prototypes/irati.py
+++ b/rumba/prototypes/irati.py
@@ -2,6 +2,7 @@
# Commands to setup and instruct IRATI
#
# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -17,32 +18,354 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301 USA
+import copy
+import json
+
+import subprocess
+
+import os
import rumba.ssh_support as ssh
import rumba.model as mod
+import rumba.prototypes.irati_templates as irati_templates
+
# An experiment over the IRATI implementation
class Experiment(mod.Experiment):
+
+ @staticmethod
+ def real_sudo(s):
+ return 'sudo ' + s
+
+ @staticmethod
+ def fake_sudo(s):
+ return s
+
def __init__(self, testbed, nodes=None):
mod.Experiment.__init__(self, testbed, nodes)
+ self.manager = False
+ self.conf_files = None
+
+ if self.testbed.username == 'root':
+ self.sudo = self.fake_sudo
+ else:
+ self.sudo = self.real_sudo
+
+ self._conf_dir = os.path.join(os.getcwd(), 'IRATI_conf')
+ try:
+ os.mkdir(self._conf_dir)
+ except OSError:
+ # Already there, nothing to do
+ pass
+
+ def conf_dir(self, path):
+ return os.path.join(self._conf_dir, path)
def setup(self):
- cmds = list()
+ """Installs IRATI on the vms."""
+ setup_irati = False
+ if setup_irati:
+ cmds = list()
- cmds.append("sudo apt-get update")
- cmds.append("sudo apt-get install g++ gcc "
- "protobuf-compiler libprotobuf-dev git --yes")
- cmds.append("sudo rm -rf ~/irati")
- cmds.append("cd && git clone https://github.com/IRATI/stack irati")
- cmds.append("cd ~/irati && sudo ./install-from-scratch")
- cmds.append("sudo nohup ipcm &> ipcm.log &")
+ cmds.append("sudo apt-get update")
+ cmds.append("sudo apt-get install g++ gcc "
+ "protobuf-compiler libprotobuf-dev git --yes")
+ cmds.append("sudo rm -rf ~/irati")
+ cmds.append("cd && git clone https://github.com/IRATI/stack irati")
+ cmds.append("cd ~/irati && sudo ./install-from-scratch")
+ cmds.append("sudo nohup ipcm &> ipcm.log &")
+ for node in self.nodes:
+ ssh.execute_commands(self.testbed, node.ssh_config,
+ cmds, time_out=None)
+
+ def bootstrap_network(self):
+ """Creates the network by enrolling and configuring the nodes"""
for node in self.nodes:
- ssh.execute_commands(self.testbed, node.ssh_config,
- cmds, time_out=None)
+ self.process_node(node)
+ self.enroll_nodes()
+
+ def run_experiment(self):
+ input('Press ENTER to quit.')
def run_prototype(self):
print("[IRATI experiment] start")
print("Setting up IRATI on the nodes...")
self.setup()
+ self.conf_files = self.write_conf()
+ self.bootstrap_network()
+ self.run_experiment()
print("[IRATI experiment] end")
+
+ def process_node(self, node):
+ """
+ Installs the configuration and boots up rina on a node
+ :type node: mod.Node
+ :param node:
+ :return:
+ """
+ name = node.name
+ gen_files_conf = self.conf_files[node] + ['da.map']
+ dir_path = os.path.dirname(os.path.abspath(__file__))
+ gen_files_bin = 'enroll.py'
+ gen_files_conf_full = [self.conf_dir(x) for x in gen_files_conf]
+ gen_files_bin_full = [os.path.join(dir_path, 'enroll.py')]
+
+ ipcm_components = ['scripting', 'console']
+ if self.manager:
+ ipcm_components.append('mad')
+ ipcm_components = ', '.join(ipcm_components)
+
+ gen_files = gen_files_conf_full + gen_files_bin_full
+
+ format_args = {'name': name,
+ 'ssh': node.ssh_config.port,
+ 'username': self.testbed.username,
+ 'genfiles': gen_files,
+ 'genfilesconf': ' '.join(gen_files_conf),
+ 'genfilesbin': gen_files_bin,
+ 'installpath': '/usr',
+ 'verb': 'DBG',
+ 'ipcmcomps': ipcm_components}
+ try:
+ # TODO: watch out for empty path...
+ ssh.copy_paths_to_testbed(self.testbed,
+ node.ssh_config,
+ gen_files,
+ '')
+ except subprocess.CalledProcessError as e:
+ raise Exception(str(e))
+
+ # TODO: review ssh opts through ssh support
+
+ cmds = [self.sudo('hostname %(name)s' % format_args),
+ self.sudo('chmod a+rw /dev/irati'),
+ self.sudo('mv %(genfilesconf)s /etc' % format_args),
+ self.sudo('mv %(genfilesbin)s /usr/bin') % format_args,
+ self.sudo('chmod a+x /usr/bin/enroll.py') % format_args]
+
+ # TODO: is the port up on the VM at this point?
+
+ cmds += [self.sudo('modprobe rina-default-plugin'),
+ self.sudo('%(installpath)s/bin/ipcm -a \"%(ipcmcomps)s\" '
+ '-c /etc/%(name)s.ipcm.conf -l %(verb)s &> log &'
+ % format_args)]
+
+ print('DEBUG: sending node setup via ssh.')
+ # print('Credentials:')
+ # print(node.ssh_config.hostname, node.ssh_config.port,
+ # self.testbed.username, self.testbed.password)
+ ssh.execute_commands(self.testbed, node.ssh_config, cmds)
+
+ def enroll_nodes(self):
+ """Runs the enrollments one by one, respecting dependencies"""
+ for enrollment_list in self.enrollments:
+ for e in enrollment_list:
+ print(
+ 'I am going to enroll %s to DIF %s against neighbor %s,'
+ ' through lower DIF %s'
+ % (e['enrollee'],
+ e['dif'].name,
+ e['enroller'],
+ e['lower_dif'].name))
+
+ subprocess.check_call('sleep 2'. split()) # Important!
+
+ e_args = {'ldif': e['lower_dif'].name,
+ 'dif': e['dif'].name,
+ 'name': e['enrollee'].name,
+ 'o_name': e['enroller'].name}
+
+ cmd = self.sudo('enroll.py --lower-dif %(ldif)s --dif %(dif)s '
+ '--ipcm-conf /etc/%(name)s.ipcm.conf '
+ '--enrollee-name %(dif)s.%(name)s.IPCP '
+ '--enroller-name %(dif)s.%(o_name)s.IPCP'
+ % e_args)
+ print('DEBUG: sending enrollment operation via ssh.')
+ # print('Credentials:')
+ # print(e['enrollee'].ssh_config.hostname,
+ # e['enrollee'].ssh_config.port,
+ # self.testbed.username, self.testbed.password)
+ ssh.execute_command(self.testbed,
+ e['enrollee'].ssh_config,
+ cmd)
+
+ def write_conf(self):
+ """Write the configuration files"""
+ # Constants and initializations
+ ipcmconfs = dict()
+ difconfs = dict()
+ ipcp2shim_map = {}
+ node2id_map = {}
+ mgmt_dif_name = 'NMS'
+ conf_files = {} # dict of per-nod conf files
+
+ # TODO: what format are the mappings registered in? Is this ok?
+ app_mappings = []
+ for node in self.nodes:
+ app_mappings += [{'name': app, 'dif': dif.name}
+ for app in node.registrations
+ for dif in node.registrations[app]]
+
+ # If some app directives were specified, use those to build da.map.
+ # Otherwise, assume the standard applications are to be mapped in
+ # the DIF with the highest rank.
+ if len(app_mappings) == 0:
+ if len(self.dif_ordering) > 0:
+ for adm in \
+ irati_templates.da_map_base["applicationToDIFMappings"]:
+ adm["difName"] = "%s" % (self.dif_ordering[-1],)
+ else:
+ irati_templates.da_map_base["applicationToDIFMappings"] = []
+ for apm in app_mappings:
+ irati_templates.da_map_base["applicationToDIFMappings"]\
+ .append({"encodedAppName": apm['name'],
+ "difName": "%s" % (apm['dif'])
+ })
+
+ # TODO ask: I guess this will need to be added,
+ # and in that case we should add it to the qemu plugin too...
+ # Where should we take it in input?
+
+ if self.manager:
+ # Add MAD/Manager configuration
+ irati_templates.ipcmconf_base["addons"] = {
+ "mad": {
+ "managerAppName": "",
+ "NMSDIFs": [{"DIF": "%s" % mgmt_dif_name}],
+ "managerConnections": [{
+ "managerAppName": "manager-1--",
+ "DIF": "%s" % mgmt_dif_name
+ }]
+ }
+ }
+
+ node_number = 1
+ for node in self.nodes: # type: mod.Node
+ node2id_map[node.name] = node_number
+ node_number += 1
+ ipcmconfs[node.name] = copy.deepcopy(irati_templates.ipcmconf_base)
+ if self.manager:
+ ipcmconfs[node.name]["addons"]["mad"]["managerAppName"] \
+ = "%s.mad-1--" % (node.name,)
+
+ for dif in self.dif_ordering: # type: mod.DIF
+ if isinstance(dif, mod.ShimEthDIF):
+ ipcp2shim_map.update({ipcp.name: dif for ipcp in dif.ipcps})
+ elif isinstance(dif, mod.NormalDIF):
+ difconfs[dif.name] = dict()
+ for node in dif.members:
+ difconfs[dif.name][node.name] = copy.deepcopy(
+ irati_templates.normal_dif_base
+ )
+
+ for node in self.nodes: # type: mod.Node
+ ipcmconf = ipcmconfs[node.name]
+
+ for ipcp in node.ipcps: # type: mod.ShimEthIPCP
+ if isinstance(ipcp, mod.ShimEthIPCP):
+ shim = ipcp2shim_map[ipcp.name] # type: mod.ShimEthDIF
+ ipcmconf["ipcProcessesToCreate"].append({
+ "apName": "eth.%s.IPCP" % ipcp.name,
+ "apInstance": "1",
+ "difName": shim.name
+ })
+
+ template_file_name = self.conf_dir('shimeth.%s.%s.dif'
+ % (node.name, shim.name))
+ ipcmconf["difConfigurations"].append({
+ "name": shim.name,
+ "template": os.path.basename(template_file_name)
+ })
+
+ fout = open(template_file_name, 'w')
+ fout.write(json.dumps(
+ {"difType": "shim-eth-vlan",
+ "configParameters": {
+ "interface-name": ipcp.ifname
+ }
+ },
+ indent=4, sort_keys=True))
+ fout.close()
+ conf_files.setdefault(node, []).append(
+ 'shimeth.%s.%s.dif' % (node.name, shim.name))
+
+ # Run over dif_ordering array, to make sure each IPCM config has
+ # the correct ordering for the ipcProcessesToCreate list of operations.
+ # If we iterated over the difs map, the order would be randomic, and so
+ # some IPCP registrations in lower DIFs may fail.
+ # This would happen because at the moment of registration,
+ # it may be that the IPCP of the lower DIF has not been created yet.
+ shims = ipcp2shim_map.values()
+ for dif in self.dif_ordering: # type: mod.NormalDIF
+
+ if dif in shims:
+ # Shims are managed separately, in the previous loop
+ continue
+
+ for node in dif.members: # type: mod.Node
+ node_name = node.name
+ ipcmconf = ipcmconfs[node_name]
+
+ normal_ipcp = {"apName": "%s.%s.IPCP" % (dif.name, node_name),
+ "apInstance": "1",
+ "difName": "%s" % (dif.name,),
+ "difsToRegisterAt": []}
+
+ for lower_dif in node.dif_registrations[dif]: # type: mod.DIF
+ normal_ipcp["difsToRegisterAt"].append(lower_dif.name)
+
+ ipcmconf["ipcProcessesToCreate"].append(normal_ipcp)
+
+ ipcmconf["difConfigurations"].append({
+ "name": "%s" % (dif.name,),
+ "template": "normal.%s.%s.dif" % (node_name, dif.name,)
+ })
+
+ # Fill in the map of IPCP addresses.
+ # This could be moved at difconfs
+ for other_node in dif.members: # type: mod.Node
+ difconfs[dif.name][other_node.name] \
+ ["knownIPCProcessAddresses"].append({
+ "apName": "%s.%s.IPCP" % (dif.name, node_name),
+ "apInstance": "1",
+ "address": 16 + node2id_map[node_name]})
+ for path, ps in dif.policies.items():
+ # if policy['nodes'] == [] or vmname in policy['nodes']:
+ # TODO: manage per-node-policies
+ irati_templates.translate_policy(
+ difconfs[dif.name][node_name], path, ps, parms=[])
+
+ # Dump the DIF Allocator map
+ with open(self.conf_dir('da.map'), 'w') as da_map_file:
+ json.dump(irati_templates.da_map_base,
+ da_map_file,
+ indent=4,
+ sort_keys=True)
+
+ for node in self.nodes:
+ # Dump the IPCM configuration files
+ with open(self.conf_dir('%s.ipcm.conf'
+ % (node.name,)), 'w') as node_file:
+ json.dump(ipcmconfs[node.name],
+ node_file,
+ indent=4,
+ sort_keys=True)
+ conf_files.setdefault(node, []).append(
+ '%s.ipcm.conf' % (node.name,))
+
+ for dif in self.dif_ordering: # type: mod.DIF
+ dif_conf = difconfs.get(dif.name, None)
+ if dif_conf:
+ # Dump the normal DIF configuration files
+ for node in dif.members:
+ with open(self.conf_dir('normal.%s.%s.dif'
+ % (node.name, dif.name)), 'w') \
+ as dif_conf_file:
+ json.dump(dif_conf[node.name],
+ dif_conf_file,
+ indent=4,
+ sort_keys=True)
+ conf_files.setdefault(node, []).append(
+ 'normal.%s.%s.dif' % (node.name, dif.name))
+ return conf_files
diff --git a/rumba/prototypes/irati_templates.py b/rumba/prototypes/irati_templates.py
new file mode 100644
index 0000000..b8d9788
--- /dev/null
+++ b/rumba/prototypes/irati_templates.py
@@ -0,0 +1,349 @@
+# Environment setup for VMs. Standard linux approach
+env_dict = {'installpath': '/usr', 'varpath': ''}
+
+# Template for a IPCM configuration file
+ipcmconf_base = {
+ "configFileVersion": "1.4.1",
+ "localConfiguration": {
+ "installationPath": "%(installpath)s/bin" % env_dict,
+ "libraryPath": "%(installpath)s/lib" % env_dict,
+ "logPath": "%(varpath)s/var/log" % env_dict,
+ "consoleSocket": "%(varpath)s/var/run/ipcm-console.sock" % env_dict,
+ "pluginsPaths": [
+ "%(installpath)s/lib/rinad/ipcp" % env_dict,
+ "/lib/modules/4.1.33-irati/extra"
+ ]
+ },
+
+ "ipcProcessesToCreate": [],
+ "difConfigurations": [],
+}
+
+
+da_map_base = {
+ "applicationToDIFMappings": [
+ {
+ "encodedAppName": "rina.apps.echotime.server-1--",
+ "difName": "n.DIF"
+ },
+ {
+ "encodedAppName": "traffic.generator.server-1--",
+ "difName": "n.DIF"
+ }
+ ],
+}
+
+
+# Template for a normal DIF configuration file
+normal_dif_base = {
+ "difType": "normal-ipc",
+ "dataTransferConstants": {
+ "addressLength": 2,
+ "cepIdLength": 2,
+ "lengthLength": 2,
+ "portIdLength": 2,
+ "qosIdLength": 2,
+ "rateLength": 4,
+ "frameLength": 4,
+ "sequenceNumberLength": 4,
+ "ctrlSequenceNumberLength": 4,
+ "maxPduSize": 10000,
+ "maxPduLifetime": 60000
+ },
+
+ "qosCubes": [
+ {
+ "name": "unreliablewithflowcontrol",
+ "id": 1,
+ "partialDelivery": False,
+ "orderedDelivery": True,
+ "efcpPolicies": {
+ "dtpPolicySet": {
+ "name": "default",
+ "version": "0"
+ },
+ "initialATimer": 0,
+ "dtcpPresent": True,
+ "dtcpConfiguration": {
+ "dtcpPolicySet": {
+ "name": "default",
+ "version": "0"
+ },
+ "rtxControl": False,
+ "flowControl": True,
+ "flowControlConfig": {
+ "rateBased": False,
+ "windowBased": True,
+ "windowBasedConfig": {
+ "maxClosedWindowQueueLength": 10,
+ "initialCredit": 200
+ }
+ }
+ }
+ }
+ }, {
+ "name": "reliablewithflowcontrol",
+ "id": 2,
+ "partialDelivery": False,
+ "orderedDelivery": True,
+ "maxAllowableGap": 0,
+ "efcpPolicies": {
+ "dtpPolicySet": {
+ "name": "default",
+ "version": "0"
+ },
+ "initialATimer": 0,
+ "dtcpPresent": True,
+ "dtcpConfiguration": {
+ "dtcpPolicySet": {
+ "name": "default",
+ "version": "0"
+ },
+ "rtxControl": True,
+ "rtxControlConfig": {
+ "dataRxmsNmax": 5,
+ "initialRtxTime": 1000
+ },
+ "flowControl": True,
+ "flowControlConfig": {
+ "rateBased": False,
+ "windowBased": True,
+ "windowBasedConfig": {
+ "maxClosedWindowQueueLength": 10,
+ "initialCredit": 200
+ }
+ }
+ }
+ }
+ }
+ ],
+
+ "knownIPCProcessAddresses": [],
+
+ "addressPrefixes": [
+ {
+ "addressPrefix": 0,
+ "organization": "N.Bourbaki"
+ }, {
+ "addressPrefix": 16,
+ "organization": "IRATI"
+ }
+ ],
+
+ "rmtConfiguration": {
+ "pffConfiguration": {
+ "policySet": {
+ "name": "default",
+ "version": "0"
+ }
+ },
+ "policySet": {
+ "name": "default",
+ "version": "1"
+ }
+ },
+
+ "enrollmentTaskConfiguration": {
+ "policySet": {
+ "name": "default",
+ "version": "1",
+ "parameters": [
+ {
+ "name": "enrollTimeoutInMs",
+ "value": "10000"
+ }, {
+ "name": "watchdogPeriodInMs",
+ "value": "30000"
+ }, {
+ "name": "declaredDeadIntervalInMs",
+ "value": "120000"
+ }, {
+ "name": "neighborsEnrollerPeriodInMs",
+ "value": "0"
+ }, {
+ "name": "maxEnrollmentRetries",
+ "value": "0"
+ }
+ ]
+ }
+ },
+
+ "flowAllocatorConfiguration": {
+ "policySet": {
+ "name": "default",
+ "version": "1"
+ }
+ },
+
+ "namespaceManagerConfiguration": {
+ "policySet": {
+ "name": "default",
+ "version": "1"
+ }
+ },
+
+ "securityManagerConfiguration": {
+ "policySet": {
+ "name": "default",
+ "version": "1"
+ }
+ },
+
+ "resourceAllocatorConfiguration": {
+ "pduftgConfiguration": {
+ "policySet": {
+ "name": "default",
+ "version": "0"
+ }
+ }
+ },
+
+ "routingConfiguration": {
+ "policySet": {
+ "name": "link-state",
+ "version": "1",
+ "parameters": [
+ {
+ "name": "objectMaximumAge",
+ "value": "10000"
+ }, {
+ "name": "waitUntilReadCDAP",
+ "value": "5001"
+ }, {
+ "name": "waitUntilError",
+ "value": "5001"
+ }, {
+ "name": "waitUntilPDUFTComputation",
+ "value": "103"
+ }, {
+ "name": "waitUntilFSODBPropagation",
+ "value": "101"
+ }, {
+ "name": "waitUntilAgeIncrement",
+ "value": "997"
+ }, {
+ "name": "routingAlgorithm",
+ "value": "Dijkstra"
+ }
+ ]
+ }
+ }
+}
+
+
+def ps_set(d, k, v, parms):
+ if k not in d:
+ d[k] = {'name': '', 'version': '1'}
+
+ if d[k]["name"] == v and "parameters" in d[k]:
+ cur_names = [p["name"] for p in d[k]["parameters"]]
+ for p in parms:
+ name, value = p.split('=')
+ if name in cur_names:
+ for i in range(len(d[k]["parameters"])):
+ if d[k]["parameters"][i]["name"] == name:
+ d[k]["parameters"][i]["value"] = value
+ break
+ else:
+ d[k]["parameters"].append({'name': name, 'value': value})
+
+ elif len(parms) > 0:
+ d[k]["parameters"] = [
+ {'name': p.split('=')[0], 'value': p.split('=')[1]}
+ for p in parms]
+
+ d[k]["name"] = v
+
+
+def dtp_ps_set(d, v, parms):
+ for i in range(len(d["qosCubes"])):
+ ps_set(d["qosCubes"][i]["efcpPolicies"], "dtpPolicySet", v, parms)
+
+
+def dtcp_ps_set(d, v, parms):
+ for i in range(len(d["qosCubes"])):
+ ps_set(d["qosCubes"][i]["efcpPolicies"]["dtcpConfiguration"],
+ "dtcpPolicySet", v, parms)
+
+
+policy_translator = {
+ 'rmt.pff': lambda d, v, p: ps_set(d["rmtConfiguration"]["pffConfiguration"],
+ "policySet", v, p),
+ 'rmt': lambda d, v, p: ps_set(d["rmtConfiguration"], "policySet", v, p),
+ 'enrollment-task': lambda d, v, p: ps_set(d["enrollmentTaskConfiguration"],
+ "policySet", v, p),
+ 'flow-allocator': lambda d, v, p: ps_set(d["flowAllocatorConfiguration"],
+ "policySet", v, p),
+ 'namespace-manager': lambda d, v, p: ps_set(
+ d["namespaceManagerConfiguration"], "policySet", v, p),
+ 'security-manager': lambda d, v, p: ps_set(
+ d["securityManagerConfiguration"], "policySet", v, p),
+ 'routing': lambda d, v, p: ps_set(
+ d["routingConfiguration"], "policySet", v, p),
+ 'resource-allocator.pduftg': lambda d, v, p: ps_set(
+ d["resourceAllocatorConfiguration"], "policySet", v, p),
+ 'efcp.*.dtcp': None,
+ 'efcp.*.dtp': None,
+}
+
+
+def is_security_path(path):
+ sp = path.split('.')
+ return (len(sp) == 3) and (sp[0] == 'security-manager') \
+ and (sp[1] in ['auth', 'encrypt', 'ttl', 'errorcheck'])
+
+
+# Do we know this path ?
+def policy_path_valid(path):
+ if path in policy_translator:
+ return True
+
+ # Try to validate security configuration
+ if is_security_path(path):
+ return True
+
+ return False
+
+
+def translate_security_path(d, path, ps, parms):
+ u1, component, profile = path.split('.')
+ if "authSDUProtProfiles" not in d["securityManagerConfiguration"]:
+ d["securityManagerConfiguration"]["authSDUProtProfiles"] = {}
+ d = d["securityManagerConfiguration"]["authSDUProtProfiles"]
+
+ tr = {'auth': 'authPolicy', 'encrypt': 'encryptPolicy',
+ 'ttl': 'TTLPolicy', 'errorcheck': 'ErrorCheckPolicy'}
+
+ if profile == 'default':
+ if profile not in d:
+ d["default"] = {}
+
+ ps_set(d["default"], tr[component], ps, parms)
+
+ else: # profile is the name of a DIF
+ if "specific" not in d:
+ d["specific"] = []
+ j = -1
+ for i in range(len(d["specific"])):
+ if d["specific"][i]["underlyingDIF"] == profile + ".DIF":
+ j = i
+ break
+
+ if j == -1: # We need to create an entry for the new DIF
+ d["specific"].append({"underlyingDIF": profile + ".DIF"})
+
+ ps_set(d["specific"][j], tr[component], ps, parms)
+
+
+def translate_policy(difconf, path, ps, parms):
+ if path == 'efcp.*.dtcp':
+ dtcp_ps_set(difconf, ps, parms)
+
+ elif path == 'efcp.*.dtp':
+ dtp_ps_set(difconf, ps, parms)
+
+ elif is_security_path(path):
+ translate_security_path(difconf, path, ps, parms)
+
+ else:
+ policy_translator[path](difconf, ps, parms)
diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py
index e66db43..0179c5d 100644
--- a/rumba/ssh_support.py
+++ b/rumba/ssh_support.py
@@ -17,9 +17,10 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301 USA
-
+import os
import paramiko
+
def get_ssh_client():
ssh_client = paramiko.SSHClient()
ssh_client.load_system_host_keys()
@@ -27,6 +28,7 @@ def get_ssh_client():
return ssh_client
+
def _print_stream(stream):
o = str(stream.read()).strip('b\'\"\\n')
if o != "":
@@ -35,6 +37,7 @@ def _print_stream(stream):
print(oi)
return o
+
def execute_commands(testbed, ssh_config, commands, time_out=3):
'''
Remote execution of a list of shell command on hostname. By
@@ -77,6 +80,7 @@ def execute_commands(testbed, ssh_config, commands, time_out=3):
print(str(e))
return
+
def execute_command(testbed, ssh_config, command, time_out=3):
'''
Remote execution of a list of shell command on hostname. By
@@ -95,6 +99,7 @@ def execute_command(testbed, ssh_config, command, time_out=3):
if o != None:
return o
+
def copy_file_to_testbed(testbed, ssh_config, text, file_name):
'''
Write a string to a given remote file.
@@ -137,6 +142,58 @@ def copy_file_to_testbed(testbed, ssh_config, text, file_name):
except Exception as e:
print(str(e))
+
+def copy_paths_to_testbed(testbed, ssh_config, paths, destination):
+ '''
+ Write a string to a given remote file.
+ Overwrite the complete file if it already exists!
+
+ @param testbed: testbed info
+ @param ssh_config: ssh config of the node
+ @param paths: source paths (local) as an iterable
+ @param destination: destination folder name (remote)
+ '''
+ ssh_client = get_ssh_client()
+
+ if destination is not '' and not destination.endswith('/'):
+ destination = destination + '/'
+
+ try:
+ ssh_client.connect(ssh_config.hostname, ssh_config.port,
+ testbed.username,
+ testbed.password,
+ look_for_keys=True)
+
+ sftp_client = ssh_client.open_sftp()
+
+ for path in paths:
+ file_name = os.path.basename(path)
+ dest_file = destination + file_name
+ print("Copying %s to %s@%s:%s path %s" % (
+ path,
+ testbed.username,
+ ssh_config.hostname,
+ ssh_config.port,
+ dest_file))
+ sftp_client.put(path, dest_file)
+
+ except Exception as e:
+ print(str(e))
+
+
+def copy_path_to_testbed(testbed, ssh_config, path, destination):
+ '''
+ Write a string to a given remote file.
+ Overwrite the complete file if it already exists!
+
+ @param testbed: testbed info
+ @param ssh_config: ssh config of the node
+ @param path: source path (local)
+ @param destination: destination folder name (remote)
+ '''
+ copy_paths_to_testbed(testbed, ssh_config, [path], destination)
+
+
def setup_vlan(testbed, node, vlan_id, int_name):
'''
Gets the interface (ethx) to link mapping
@@ -146,22 +203,27 @@ def setup_vlan(testbed, node, vlan_id, int_name):
@param vlan_id: the VLAN id
@param int_name: the name of the interface
'''
+ if testbed.username == 'root':
+ def sudo(s):
+ return s
+ else:
+ def sudo(s):
+ return 'sudo ' + s
+
print("Setting up VLAN on node " + node.name)
- cmds = list()
- cmd = "sudo ip link add link " + \
- str(int_name) + \
- " name " + str(int_name) + \
- "." + str(vlan_id) + \
- " type vlan id " + str(vlan_id)
- cmds.append(cmd)
- cmd = "sudo ifconfig " + \
- str(int_name) + "." + \
- str(vlan_id) + " up"
- cmds.append(cmd)
- cmd = "sudo ethtool -K " + \
- str(int_name) + " rxvlan off"
- cmds.append(cmd)
- cmd = "sudo ethtool -K " + \
- str(int_name) + " txvlan off"
+ args = {'ifname': str(int_name), 'vlan': str(vlan_id)}
+
+ cmds = [sudo("ip link set %(ifname)s up"
+ % args),
+ sudo("ip link add link %(ifname)s name "
+ "%(ifname)s.%(vlan)s type vlan id %(vlan)s"
+ % args),
+ sudo("ifconfig %(ifname)s.%(vlan)s up"
+ % args)]
+ # TODO: is ethtool needed? Should install or check if it is present.
+ # cmds += [sudo("ethtool -K %(ifname)s rxvlan off"
+ # % args),
+ # sudo("ethtool -K %(ifname)s txvlan off"
+ # % args)]
execute_commands(testbed, node.ssh_config, cmds)
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 3573554..174b860 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -2,6 +2,7 @@
# QEMU testbed for Rumba
#
# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -23,10 +24,12 @@ import subprocess
import os
import rumba.model as mod
+from rumba import ssh_support
class Testbed(mod.Testbed):
- def __init__(self, exp_name, username, bzimage, initramfs, proj_name="ARCFIRE", password="",
+ def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE",
+ password="root", username="root",
use_vhost=True, qemu_logs_dir=None):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
self.vms = {}
@@ -34,15 +37,18 @@ class Testbed(mod.Testbed):
self.bzimage = bzimage
self.initramfs = initramfs
self.vhost = use_vhost
- self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir
+ self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None \
+ else qemu_logs_dir
self.boot_processes = []
@staticmethod
- def _run_command_chain(commands, results_queue, error_queue, ignore_errors=False):
+ def _run_command_chain(commands, results_queue,
+ error_queue, ignore_errors=False):
"""
Runs (sequentially) the command list.
- On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.
+ On error, breaks and dumps it in error_queue, and interrupts
+ as soon as it is non-empty (unless ignore errors is True).
:type commands: list
:type results_queue: Queue
@@ -69,6 +75,37 @@ class Testbed(mod.Testbed):
else:
results_queue.put("Command chain ran with %d errors" % errors)
+ def recover_if_names(self, experiment):
+ next_vlan = 10
+ assigned_vlan = {}
+ for node in experiment.nodes:
+ for ipcp in node.ipcps:
+ if isinstance(ipcp, mod.ShimEthIPCP):
+ shim_name, node_name = ipcp.name.split('.')
+ port_set = [x for x in self.vms[node_name]['ports']
+ if x['shim'].name == shim_name]
+ port = port_set[0]
+ port_id = port['port_id']
+ vm_id = self.vms[node_name]['id']
+ mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id)
+ print('DEBUG: recovering ifname for port: '
+ + port['tap_id'] + '.')
+ output = ssh_support.execute_command(
+ self,
+ node.ssh_config,
+ 'mac2ifname ' + mac)
+ ipcp.ifname = output
+ try:
+ vlan = int(port['shim'].name)
+ except ValueError:
+ vlan = assigned_vlan.get(port['shim'].name, None)
+ if vlan is None:
+ vlan = next_vlan
+ next_vlan += 10
+ assigned_vlan[port['shim'].name] = vlan
+ ssh_support.setup_vlan(self, node,
+ vlan, ipcp.ifname)
+
def swap_in(self, experiment):
"""
:type experiment mod.Experiment
@@ -88,11 +125,12 @@ class Testbed(mod.Testbed):
e_queue = multiprocessing.Queue()
print(experiment.dif_ordering)
for shim in experiment.dif_ordering:
- command_list = []
if not isinstance(shim, mod.ShimEthDIF):
# Nothing to do here
continue
self.shims.append(shim)
+ ipcps = shim.ipcps
+ command_list = []
command_list += ('sudo brctl addbr %(br)s\n'
'sudo ip link set %(br)s up'
% {'br': shim.name}
@@ -113,23 +151,39 @@ class Testbed(mod.Testbed):
speed = '%dmbit' % shim.link_speed
# Rate limit the traffic transmitted on the TAP interface
- command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
- 'htb default 11\n'
- 'sudo tc class add dev %(tap)s parent 1: classid '
- '1:1 htb rate 10gbit\n'
- 'sudo tc class add dev %(tap)s parent 1:1 classid '
- '1:11 htb rate %(speed)s'
- % {'tap': tap_id, 'speed': speed}
- ).split('\n')
-
- vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
+ command_list += (
+ 'sudo tc qdisc add dev %(tap)s handle 1: root '
+ 'htb default 11\n'
+ 'sudo tc class add dev %(tap)s parent 1: classid '
+ '1:1 htb rate 10gbit\n'
+ 'sudo tc class add dev %(tap)s parent 1:1 classid '
+ '1:11 htb rate %(speed)s'
+ % {'tap': tap_id, 'speed': speed}
+ ).split('\n')
+
+ vm['ports'].append({'tap_id': tap_id,
+ 'shim': shim,
+ 'port_id': port_id})
+ ipcp_set = [x for x in ipcps if x in node.ipcps]
+ if len(ipcp_set) > 1:
+ raise Exception("Error: more than one ipcp in common "
+ "between shim dif %s and node %s"
+ % (shim.name, node.name))
+ ipcp = ipcp_set[0] # type: mod.ShimEthIPCP
+ assert ipcp.name == '%s.%s' % (shim.name, node.name), \
+ 'Incorrect Shim Ipcp found: expected %s.%s, found %s' \
+ % (shim.name, node.name, ipcp.name)
+ ipcp.ifname = tap_id
# TODO deal with Ip address (shim UDP DIF).
# Avoid stacking processes if one failed before.
if not e_queue.empty():
break
# Launch commands asynchronously
- process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+ process = multiprocessing.Process(target=self._run_command_chain,
+ args=(command_list,
+ r_queue,
+ e_queue))
shim_processes.append(process)
process.start()
@@ -149,7 +203,8 @@ class Testbed(mod.Testbed):
result = r_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -157,9 +212,9 @@ class Testbed(mod.Testbed):
boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
booting_budget = boot_batch_size
- boot_backoff = 12 # in seconds
+ boot_backoff = 12 # in seconds
base_port = 2222
- vm_memory = 164 # in megabytes
+ vm_memory = 164 # in megabytes
vm_frontend = 'virtio-net-pci'
vmid = 1
@@ -167,6 +222,7 @@ class Testbed(mod.Testbed):
for node in experiment.nodes:
name = node.name
vm = self.vms.setdefault(name, {'vm': node, 'ports': []})
+ vm['id'] = vmid
fwdp = base_port + vmid
fwdc = fwdp + 10000
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, 99)
@@ -199,7 +255,6 @@ class Testbed(mod.Testbed):
'-device %(frontend)s,mac=%(mac)s,netdev=mgmt '
'-netdev user,id=mgmt,%(hostfwdstr)s '
'-vga std '
- '-pidfile rina-%(id)s.pid '
'-serial file:%(vmname)s.log '
% vars_dict
)
@@ -211,23 +266,29 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
port['mac'] = mac
- command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
- '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
- 'downscript=no%(vhost)s '
- % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
- 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
- )
+ command += (
+ '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
+ '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
+ 'downscript=no%(vhost)s '
+ % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
+ 'frontend': vm_frontend,
+ 'vhost': ',vhost=on' if self.vhost else ''}
+ )
booting_budget -= 1
if booting_budget <= 0:
- print('Sleeping %s secs waiting for the VMs to boot' % boot_backoff)
+
+ print('Sleeping %s secs waiting '
+ 'for the VMs to boot' % boot_backoff)
+
time.sleep(boot_backoff)
booting_budget = boot_batch_size
- with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file:
+ with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w')\
+ as out_file:
print('DEBUG: executing >> %s' % command)
- self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file))
- pass
+ self.boot_processes.append(subprocess.Popen(command.split(),
+ stdout=out_file))
vmid += 1
@@ -238,6 +299,11 @@ class Testbed(mod.Testbed):
print('Sleeping %s secs waiting for the last VMs to boot' % tsleep)
time.sleep(tsleep)
+ # TODO: to be removed, we should loop in the ssh part
+ print('Sleeping 5 seconds, just to be on the safe side')
+ time.sleep(5)
+
+ self.recover_if_names(experiment)
def swap_out(self, experiment):
"""
@@ -267,9 +333,10 @@ class Testbed(mod.Testbed):
'sudo ip tuntap del mode tap name %(tap)s'
% {'tap': tap, 'br': shim.name}
).split('\n')
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True})
+ process = multiprocessing.Process(
+ target=self._run_command_chain,
+ args=(commands, results_queue, error_queue),
+ kwargs={'ignore_errors': True})
port_processes.append(process)
process.start()
@@ -280,14 +347,17 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print(
+ 'Failure while shutting down: %s' % str(error_queue.get())
+ )
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down port processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -302,7 +372,9 @@ class Testbed(mod.Testbed):
% {'br': shim.name}
).split('\n')
process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
+ args=(commands,
+ results_queue,
+ error_queue),
kwargs={'ignore_errors': True})
shim_processes.append(process)
process.start()
@@ -314,13 +386,15 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print('Failure while shutting down: %s'
+ % str(error_queue.get()))
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down shim processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1