diff options
-rw-r--r-- | rumba/model.py | 53 | ||||
-rw-r--r-- | rumba/prototypes/irati.py | 332 | ||||
-rw-r--r-- | rumba/prototypes/irati_templates.py | 338 | ||||
-rw-r--r-- | rumba/testbeds/qemu.py | 150 | ||||
-rwxr-xr-x | tools/democonf2rumba.py | 16 |
5 files changed, 815 insertions, 74 deletions
diff --git a/rumba/model.py b/rumba/model.py index c2c4cfa..6f031ac 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -20,7 +20,7 @@ # MA 02110-1301 USA import abc -import re + # Represents generic testbed info # @@ -65,7 +65,7 @@ class DIF: return hash(self.name) def __eq__(self, other): - return other != None and self.name == other.name + return other is not None and self.name == other.name def __neq__(self, other): return not self == other @@ -79,6 +79,7 @@ class DIF: def get_ipcp_class(self): return IPCP + # Shim over UDP # class ShimUDPDIF(DIF): @@ -88,6 +89,7 @@ class ShimUDPDIF(DIF): def get_ipcp_class(self): return ShimUDPIPCP + # Shim over Ethernet # # @link_speed [int] Speed of the Ethernet network, in Mbps @@ -102,6 +104,7 @@ class ShimEthDIF(DIF): def get_ipcp_class(self): return ShimEthIPCP + # Normal DIF # # @policies [dict] Policies of the normal DIF @@ -125,6 +128,7 @@ class NormalDIF(DIF): s += "\n Component %s has policy %s" % (comp, pol) return s + # SSH Configuration # class SSHConfig: @@ -132,6 +136,7 @@ class SSHConfig: self.hostname = hostname self.port = port + # A node in the experiment # # @difs: DIFs the node will have an IPCP in @@ -164,7 +169,7 @@ class Node: def _undeclared_dif(self, dif): if dif not in self.difs: - raise Exception("Invalid registration: node %s is not declared "\ + raise Exception("Invalid registration: node %s is not declared " "to be part of DIF %s" % (self.name, dif.name)) def _validate(self): @@ -206,8 +211,8 @@ class Node: s += " ]\n" s += " Bindings: [ " - s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap]) \ - for ap in self.bindings]) + s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap]) + for ap in self.bindings]) s += " ]\n" return s @@ -216,7 +221,7 @@ class Node: return hash(self.name) def __eq__(self, other): - return other != None and self.name == other.name + return other is not None and self.name == other.name def __neq__(self, other): return not self == other @@ -255,6 +260,7 @@ class Node: del self.bindings[name] self._validate() + # Base class representing an IPC Process to be created in the experiment # # @name [string]: IPCP name @@ -277,28 +283,31 @@ class IPCP: (self.name, self.dif.name, ' '.join([dif.name for dif in self.registrations]), ',bootstrapper' if self.dif_bootstrapper else '' - ) + ) def __hash__(self): return hash((self.name, self.dif.name)) def __eq__(self, other): - return other != None and self.name == other.name \ + return other is not None and self.name == other.name \ and self.dif == other.dif def __neq__(self, other): return not self == other + class ShimEthIPCP(IPCP): def __init__(self, name, node, dif, ifname=None): IPCP.__init__(self, name, node, dif) self.ifname = ifname + class ShimUDPIPCP(IPCP): def __init__(self, name, node, dif): IPCP.__init__(self, name, node, dif) # TODO: add IP and port + # Base class for ARCFIRE experiments # # @name [string] Name of the experiment @@ -312,7 +321,7 @@ class Experiment: self.testbed = testbed self.enrollment_strategy = 'minimal' # 'full-mesh', 'manual' self.dif_ordering = [] - self.enrollments = [] # a list of per-DIF lists of enrollments + self.enrollments = [] # a list of per-DIF lists of enrollments # Generate missing information self.generate() @@ -360,8 +369,8 @@ class Experiment: difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif]) del difsdeps_inc - #print(difsdeps_adj) - #print(difsdeps_inc_cnt) + # print(difsdeps_adj) + # print(difsdeps_inc_cnt) # Run Kahn's algorithm to compute topological # ordering on the DIFs graph. @@ -380,12 +389,12 @@ class Experiment: frontier.add(nxt) difsdeps_adj[cur] = set() - circular_set = [dif for dif in difsdeps_inc_cnt \ + circular_set = [dif for dif in difsdeps_inc_cnt if difsdeps_inc_cnt[dif] != 0] if len(circular_set): - raise Exception("Fatal error: The specified DIFs topology" \ - "has one or more" \ - "circular dependencies, involving the following" \ + raise Exception("Fatal error: The specified DIFs topology" + "has one or more" + "circular dependencies, involving the following" " DIFs: %s" % circular_set) print("DIF topological ordering: %s" % self.dif_ordering) @@ -406,8 +415,8 @@ class Experiment: for node in self.nodes: if dif in node.dif_registrations: - dif_graphs[dif][node] = [] # init for later use - if first is None: # pick any node for later use + dif_graphs[dif][node] = [] # init for later use + if first is None: # pick any node for later use first = node for lower_dif in node.dif_registrations[dif]: if lower_dif not in neighsets: @@ -468,11 +477,11 @@ class Experiment: print("Enrollments:") for el in self.enrollments: for e in el: - print(" [%s] %s --> %s through N-1-DIF %s" % \ - (e['dif'], - e['enrollee'].name, - e['enroller'].name, - e['lower_dif'])) + print(" [%s] %s --> %s through N-1-DIF %s" % + (e['dif'], + e['enrollee'].name, + e['enroller'].name, + e['lower_dif'])) def compute_ipcps(self): # For each node, compute the required IPCP instances, and associated diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py index e8766da..44eadb0 100644 --- a/rumba/prototypes/irati.py +++ b/rumba/prototypes/irati.py @@ -2,6 +2,7 @@ # Commands to setup and instruct IRATI # # Vincenzo Maffione <v.maffione@nextworks.it> +# Marco Capitani <m.capitani@nextworks.it> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -17,32 +18,343 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301 USA +import copy +import json + +import subprocess + +import os import rumba.ssh_support as ssh import rumba.model as mod +import rumba.prototypes.irati_templates as irati_templates + # An experiment over the IRATI implementation +from rumba import ssh_support + + class Experiment(mod.Experiment): + + @staticmethod + def real_sudo(s): + return 'sudo ' + s + + @staticmethod + def fake_sudo(s): + return s + def __init__(self, testbed, nodes=None): mod.Experiment.__init__(self, testbed, nodes) + self.manager = False + + if self.testbed.username == 'root': + self.sudo = self.fake_sudo + else: + self.sudo = self.real_sudo def setup(self): - cmds = list() + """Installs IRATI on the vms.""" + setup_irati = False + if setup_irati: + cmds = list() - cmds.append("sudo apt-get update") - cmds.append("sudo apt-get install g++ gcc " - "protobuf-compiler libprotobuf-dev git --yes") - cmds.append("sudo rm -rf ~/irati") - cmds.append("cd && git clone https://github.com/IRATI/stack irati") - cmds.append("cd ~/irati && sudo ./install-from-scratch") - cmds.append("sudo nohup ipcm &> ipcm.log &") + cmds.append("sudo apt-get update") + cmds.append("sudo apt-get install g++ gcc " + "protobuf-compiler libprotobuf-dev git --yes") + cmds.append("sudo rm -rf ~/irati") + cmds.append("cd && git clone https://github.com/IRATI/stack irati") + cmds.append("cd ~/irati && sudo ./install-from-scratch") + cmds.append("sudo nohup ipcm &> ipcm.log &") + for node in self.nodes: + ssh.execute_commands(self.testbed, node.ssh_config, + cmds, time_out=None) + + def bootstrap_network(self): + """Creates the network by enrolling and configuring the nodes""" for node in self.nodes: - ssh.execute_commands(self.testbed, node.ssh_config, - cmds, time_out=None) + self.process_node(node) + self.enroll_nodes() + + def run_experiment(self): + input('Press ENTER to quit.') def run_prototype(self): print("[IRATI experiment] start") print("Setting up IRATI on the nodes...") self.setup() + self.write_conf() + self.bootstrap_network() + self.run_experiment() print("[IRATI experiment] end") + + def process_node(self, node): + """ + Installs the configuration and boots up rina on a node + :type node: mod.Node + :param node: + :return: + """ + name = node.name + gen_files_conf = 'shimeth.%(name)s.*.dif da.map %(name)s.ipcm.conf' % { + 'name': name} + if any(node in dif.members for dif in self.dif_ordering): + gen_files_conf = ' '.join( + [gen_files_conf, 'normal.%(name)s.*.dif' % {'name': name}]) + dir_path = os.path.dirname(os.path.abspath(__file__)) + gen_files_bin = 'enroll.py' + gen_files_bin_full = os.path.join(dir_path, 'enroll.py') + + ipcm_components = ['scripting', 'console'] + if self.manager: + ipcm_components.append('mad') + ipcm_components = ', '.join(ipcm_components) + + gen_files = ' '.join([gen_files_conf, gen_files_bin_full]) + + sshopts = ('-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null' + ' -o IdentityFile=buildroot/irati_rsa') + format_args = {'name': name, + 'ssh': node.ssh_config.port, + 'username': self.testbed.username, + 'genfiles': gen_files, + 'genfilesconf': gen_files_conf, + 'genfilesbin': gen_files_bin, + 'sshopts': sshopts, + 'installpath': '/usr', + 'verb': 'DBG', + 'ipcmcomps': ipcm_components} + try: + print('DEBUG: executing >> ' + 'scp %(sshopts)s -r -P %(ssh)s ' + '%(genfiles)s %(username)s@localhost:' + % format_args) + subprocess.check_call(('scp %(sshopts)s -r -P %(ssh)s ' + '%(genfiles)s %(username)s@localhost:' + % format_args), shell=True) + except subprocess.CalledProcessError as e: + raise Exception(str(e)) + + # TODO: review ssh opts through ssh support + + cmds = [self.sudo('hostname %(name)s' % format_args), + self.sudo('chmod a+rw /dev/irati'), + self.sudo('mv %(genfilesconf)s /etc' % format_args), + self.sudo('mv %(genfilesbin)s /usr/bin') % format_args] + + # TODO: is the port up on the VM at this point? + + cmds += [self.sudo('modprobe rina-default-plugin'), + self.sudo('%(installpath)s/bin/ipcm -a \"%(ipcmcomps)s\" ' + '-c /etc/%(name)s.ipcm.conf -l %(verb)s &> log &' + % format_args)] + + print('DEBUG: sending node setup via ssh.') + # print('Credentials:') + # print(node.ssh_config.hostname, node.ssh_config.port, + # self.testbed.username, self.testbed.password) + ssh_support.execute_commands(self.testbed, node.ssh_config, cmds) + + def enroll_nodes(self): + """Runs the enrollments one by one, respecting dependencies""" + for enrollment_list in self.enrollments: + for e in enrollment_list: + print( + 'I am going to enroll %s to DIF %s against neighbor %s,' + ' through lower DIF %s' + % (e['enrollee'], + e['dif'].name, + e['enroller'], + e['lower_dif'].name)) + + subprocess.check_call('sleep 2'. split()) # Important! + + e_args = {'ldif': e['lower_dif'].name, + 'dif': e['dif'].name, + 'name': e['enrollee'].name, + 'o_name': e['enroller'].name} + + cmd = self.sudo('enroll.py --lower-dif %(ldif)s --dif %(dif)s ' + '--ipcm-conf /etc/%(name)s.ipcm.conf ' + '--enrollee-name %(dif)s.%(name)s.IPCP ' + '--enroller-name %(dif)s.%(o_name)s.IPCP' + % e_args) + print('DEBUG: sending enrollment operation via ssh.') + # print('Credentials:') + # print(e['enrollee'].ssh_config.hostname, + # e['enrollee'].ssh_config.port, + # self.testbed.username, self.testbed.password) + ssh_support.execute_command(self.testbed, + e['enrollee'].ssh_config, + cmd) + + def write_conf(self): + """Write the configuration files""" + # Constants and initializations + ipcmconfs = dict() + difconfs = dict() + ipcp2shim_map = {} + node2id_map = {} + mgmt_dif_name = 'NMS' + + # TODO: what format are the mappings registered in? Is this ok? + app_mappings = [] + for node in self.nodes: + app_mappings += [{'name': app, 'dif': dif.name} + for app in node.registrations + for dif in node.registrations[app]] + + # If some app directives were specified, use those to build da.map. + # Otherwise, assume the standard applications are to be mapped in + # the DIF with the highest rank. + if len(app_mappings) == 0: + if len(self.dif_ordering) > 0: + for adm in \ + irati_templates.da_map_base["applicationToDIFMappings"]: + adm["difName"] = "%s" % (self.dif_ordering[-1],) + else: + irati_templates.da_map_base["applicationToDIFMappings"] = [] + for apm in app_mappings: + irati_templates.da_map_base["applicationToDIFMappings"]\ + .append({"encodedAppName": apm['name'], + "difName": "%s" % (apm['dif']) + }) + + # TODO ask: I guess this will need to be added, + # and in that case we should add it to the qemu plugin too... + # Where should we take it in input? + + if self.manager: + # Add MAD/Manager configuration + irati_templates.ipcmconf_base["addons"] = { + "mad": { + "managerAppName": "", + "NMSDIFs": [{"DIF": "%s" % mgmt_dif_name}], + "managerConnections": [{ + "managerAppName": "manager-1--", + "DIF": "%s" % mgmt_dif_name + }] + } + } + + node_number = 1 + for node in self.nodes: # type: mod.Node + node2id_map[node.name] = node_number + node_number += 1 + ipcmconfs[node.name] = copy.deepcopy(irati_templates.ipcmconf_base) + if self.manager: + ipcmconfs[node.name]["addons"]["mad"]["managerAppName"] \ + = "%s.mad-1--" % (node.name,) + + for dif in self.dif_ordering: # type: mod.DIF + if isinstance(dif, mod.ShimEthDIF): + ipcp2shim_map.update({ipcp.name: dif for ipcp in dif.ipcps}) + elif isinstance(dif, mod.NormalDIF): + difconfs[dif.name] = dict() + for node in dif.members: + difconfs[dif.name][node.name] = copy.deepcopy( + irati_templates.normal_dif_base + ) + + for node in self.nodes: # type: mod.Node + ipcmconf = ipcmconfs[node.name] + + for ipcp in node.ipcps: # type: mod.ShimEthIPCP + if isinstance(ipcp, mod.ShimEthIPCP): + shim = ipcp2shim_map[ipcp.name] # type: mod.ShimEthDIF + ipcmconf["ipcProcessesToCreate"].append({ + "apName": "eth.%s.IPCP" % ipcp.name, + "apInstance": "1", + "difName": shim.name + }) + + template_file_name = 'shimeth.%s.%s.dif' \ + % (node.name, shim.name) + ipcmconf["difConfigurations"].append({ + "name": shim.name, + "template": template_file_name + }) + + fout = open(template_file_name, 'w') + fout.write(json.dumps( + {"difType": "shim-eth-vlan", + "configParameters": { + "interface-name": ipcp.ifname + } + }, + indent=4, sort_keys=True)) + fout.close() + + # Run over dif_ordering array, to make sure each IPCM config has + # the correct ordering for the ipcProcessesToCreate list of operations. + # If we iterated over the difs map, the order would be randomic, and so + # some IPCP registrations in lower DIFs may fail. + # This would happen because at the moment of registration, + # it may be that the IPCP of the lower DIF has not been created yet. + shims = ipcp2shim_map.values() + for dif in self.dif_ordering: # type: mod.NormalDIF + + if dif in shims: + # Shims are managed separately, in the previous loop + continue + + for node in dif.members: # type: mod.Node + node_name = node.name + ipcmconf = ipcmconfs[node_name] + + normal_ipcp = {"apName": "%s.%s.IPCP" % (dif.name, node_name), + "apInstance": "1", + "difName": "%s" % (dif.name,), + "difsToRegisterAt": []} + + for lower_dif in node.dif_registrations[dif]: # type: mod.DIF + normal_ipcp["difsToRegisterAt"].append(lower_dif.name) + + ipcmconf["ipcProcessesToCreate"].append(normal_ipcp) + + ipcmconf["difConfigurations"].append({ + "name": "%s" % (dif.name,), + "template": "normal.%s.%s.dif" % (node_name, dif.name,) + }) + + # Fill in the map of IPCP addresses. + # This could be moved at difconfs + for other_node in dif.members: # type: mod.Node + difconfs[dif.name][other_node.name] \ + ["knownIPCProcessAddresses"].append({ + "apName": "%s.%s.IPCP" % (dif.name, node_name), + "apInstance": "1", + "address": 16 + node2id_map[node_name]}) + for path, ps in dif.policies.items(): + # if policy['nodes'] == [] or vmname in policy['nodes']: + # TODO: manage per-node-policies + irati_templates.translate_policy( + difconfs[dif.name][node_name], path, ps, parms=[]) + + # Dump the DIF Allocator map + with open('da.map', 'w') as da_map_file: + json.dump(irati_templates.da_map_base, + da_map_file, + indent=4, + sort_keys=True) + + for node in self.nodes: + # Dump the IPCM configuration files + with open('%s.ipcm.conf' % (node.name,), 'w') as node_file: + json.dump(ipcmconfs[node.name], + node_file, + indent=4, + sort_keys=True) + + for dif in self.dif_ordering: # type: mod.DIF + dif_conf = difconfs.get(dif.name, None) + if dif_conf: + # Dump the normal DIF configuration files + for node in dif.members: + with open('normal.%s.%s.dif' % (node.name, dif.name), 'w') \ + as dif_conf_file: + json.dump(dif_conf[node.name], + dif_conf_file, + indent=4, + sort_keys=True) diff --git a/rumba/prototypes/irati_templates.py b/rumba/prototypes/irati_templates.py new file mode 100644 index 0000000..0f3ef05 --- /dev/null +++ b/rumba/prototypes/irati_templates.py @@ -0,0 +1,338 @@ +# Environment setup for VMs. Standard linux approach +env_dict = {'installpath': '/usr', 'varpath': ''} + +# Template for a IPCM configuration file +ipcmconf_base = { + "configFileVersion": "1.4.1", + "localConfiguration": { + "installationPath": "%(installpath)s/bin" % env_dict, + "libraryPath": "%(installpath)s/lib" % env_dict, + "logPath": "%(varpath)s/var/log" % env_dict, + "consoleSocket": "%(varpath)s/var/run/ipcm-console.sock" % env_dict, + "pluginsPaths": [ + "%(installpath)s/lib/rinad/ipcp" % env_dict, + "/lib/modules/4.1.33-irati/extra" + ] + }, + + "ipcProcessesToCreate": [], + "difConfigurations": [], +} + + +da_map_base = { + "applicationToDIFMappings": [ + { + "encodedAppName": "rina.apps.echotime.server-1--", + "difName": "n.DIF" + }, + { + "encodedAppName": "traffic.generator.server-1--", + "difName": "n.DIF" + } + ], +} + + +# Template for a normal DIF configuration file +normal_dif_base = { + "difType" : "normal-ipc", + "dataTransferConstants" : { + "addressLength" : 2, + "cepIdLength" : 2, + "lengthLength" : 2, + "portIdLength" : 2, + "qosIdLength" : 2, + "rateLength" : 4, + "frameLength" : 4, + "sequenceNumberLength" : 4, + "ctrlSequenceNumberLength" : 4, + "maxPduSize" : 10000, + "maxPduLifetime" : 60000 + }, + + "qosCubes" : [ { + "name" : "unreliablewithflowcontrol", + "id" : 1, + "partialDelivery" : False, + "orderedDelivery" : True, + "efcpPolicies" : { + "dtpPolicySet" : { + "name" : "default", + "version" : "0" + }, + "initialATimer" : 0, + "dtcpPresent" : True, + "dtcpConfiguration" : { + "dtcpPolicySet" : { + "name" : "default", + "version" : "0" + }, + "rtxControl" : False, + "flowControl" : True, + "flowControlConfig" : { + "rateBased" : False, + "windowBased" : True, + "windowBasedConfig" : { + "maxClosedWindowQueueLength" : 10, + "initialCredit" : 200 + } + } + } + } + }, { + "name" : "reliablewithflowcontrol", + "id" : 2, + "partialDelivery" : False, + "orderedDelivery" : True, + "maxAllowableGap": 0, + "efcpPolicies" : { + "dtpPolicySet" : { + "name" : "default", + "version" : "0" + }, + "initialATimer" : 0, + "dtcpPresent" : True, + "dtcpConfiguration" : { + "dtcpPolicySet" : { + "name" : "default", + "version" : "0" + }, + "rtxControl" : True, + "rtxControlConfig" : { + "dataRxmsNmax" : 5, + "initialRtxTime" : 1000 + }, + "flowControl" : True, + "flowControlConfig" : { + "rateBased" : False, + "windowBased" : True, + "windowBasedConfig" : { + "maxClosedWindowQueueLength" : 10, + "initialCredit" : 200 + } + } + } + } + } ], + + "knownIPCProcessAddresses": [], + + "addressPrefixes" : [ { + "addressPrefix" : 0, + "organization" : "N.Bourbaki" + }, { + "addressPrefix" : 16, + "organization" : "IRATI" + } ], + + "rmtConfiguration" : { + "pffConfiguration" : { + "policySet" : { + "name" : "default", + "version" : "0" + } + }, + "policySet" : { + "name" : "default", + "version" : "1" + } + }, + + "enrollmentTaskConfiguration" : { + "policySet" : { + "name" : "default", + "version" : "1", + "parameters" : [ { + "name" : "enrollTimeoutInMs", + "value" : "10000" + }, { + "name" : "watchdogPeriodInMs", + "value" : "30000" + }, { + "name" : "declaredDeadIntervalInMs", + "value" : "120000" + }, { + "name" : "neighborsEnrollerPeriodInMs", + "value" : "0" + }, { + "name" : "maxEnrollmentRetries", + "value" : "0" + } ] + } + }, + + "flowAllocatorConfiguration" : { + "policySet" : { + "name" : "default", + "version" : "1" + } + }, + + "namespaceManagerConfiguration" : { + "policySet" : { + "name" : "default", + "version" : "1" + } + }, + + "securityManagerConfiguration" : { + "policySet" : { + "name" : "default", + "version" : "1" + } + }, + + "resourceAllocatorConfiguration" : { + "pduftgConfiguration" : { + "policySet" : { + "name" : "default", + "version" : "0" + } + } + }, + + "routingConfiguration" : { + "policySet" : { + "name" : "link-state", + "version" : "1", + "parameters" : [ { + "name" : "objectMaximumAge", + "value" : "10000" + },{ + "name" : "waitUntilReadCDAP", + "value" : "5001" + },{ + "name" : "waitUntilError", + "value" : "5001" + },{ + "name" : "waitUntilPDUFTComputation", + "value" : "103" + },{ + "name" : "waitUntilFSODBPropagation", + "value" : "101" + },{ + "name" : "waitUntilAgeIncrement", + "value" : "997" + },{ + "name" : "routingAlgorithm", + "value" : "Dijkstra" + }] + } + } +} + + +def ps_set(d, k, v, parms): + if k not in d: + d[k] = {'name': '', 'version': '1'} + + if d[k]["name"] == v and "parameters" in d[k]: + cur_names = [p["name"] for p in d[k]["parameters"]] + for p in parms: + name, value = p.split('=') + if name in cur_names: + for i in range(len(d[k]["parameters"])): + if d[k]["parameters"][i]["name"] == name: + d[k]["parameters"][i]["value"] = value + break + else: + d[k]["parameters"].append({ 'name': name, 'value': value }) + + elif len(parms) > 0: + d[k]["parameters"] = [ { 'name': p.split('=')[0], 'value': p.split('=')[1]} for p in parms ] + + d[k]["name"] = v + + +def dtp_ps_set(d, v, parms): + for i in range(len(d["qosCubes"])): + ps_set(d["qosCubes"][i]["efcpPolicies"], "dtpPolicySet", v, parms) + + +def dtcp_ps_set(d, v, parms): + for i in range(len(d["qosCubes"])): + ps_set(d["qosCubes"][i]["efcpPolicies"]["dtcpConfiguration"], "dtcpPolicySet", v, parms) + + +policy_translator = { + 'rmt.pff': lambda d, v, p: ps_set(d["rmtConfiguration"]["pffConfiguration"], + "policySet", v, p), + 'rmt': lambda d, v, p: ps_set(d["rmtConfiguration"], "policySet", v, p), + 'enrollment-task': lambda d, v, p: ps_set(d["enrollmentTaskConfiguration"], + "policySet", v, p), + 'flow-allocator': lambda d, v, p: ps_set(d["flowAllocatorConfiguration"], + "policySet", v, p), + 'namespace-manager': lambda d, v, p: ps_set( + d["namespaceManagerConfiguration"], "policySet", v, p), + 'security-manager': lambda d, v, p: ps_set( + d["securityManagerConfiguration"], "policySet", v, p), + 'routing': lambda d, v, p: ps_set(d["routingConfiguration"], "policySet", v, p), + 'resource-allocator.pduftg': lambda d, v, p: ps_set( + d["resourceAllocatorConfiguration"], "policySet", v, p), + 'efcp.*.dtcp': None, + 'efcp.*.dtp': None, +} + + +def is_security_path(path): + sp = path.split('.') + return (len(sp) == 3) and (sp[0] == 'security-manager') \ + and (sp[1] in ['auth', 'encrypt', 'ttl', 'errorcheck']) + + +# Do we know this path ? +def policy_path_valid(path): + if path in policy_translator: + return True + + # Try to validate security configuration + if is_security_path(path): + return True + + return False + + +def translate_security_path(d, path, ps, parms): + u1, component, profile = path.split('.') + if "authSDUProtProfiles" not in d["securityManagerConfiguration"]: + d["securityManagerConfiguration"]["authSDUProtProfiles"] = {} + d = d["securityManagerConfiguration"]["authSDUProtProfiles"] + + tr = {'auth': 'authPolicy', 'encrypt': 'encryptPolicy', + 'ttl': 'TTLPolicy', 'errorcheck': 'ErrorCheckPolicy'} + + if profile == 'default': + if profile not in d: + d["default"] = {} + + ps_set(d["default"], tr[component], ps, parms) + + else: # profile is the name of a DIF + if "specific" not in d: + d["specific"] = [] + j = -1 + for i in range(len(d["specific"])): + if d["specific"][i]["underlyingDIF"] == profile + ".DIF": + j = i + break + + if j == -1: # We need to create an entry for the new DIF + d["specific"].append({"underlyingDIF" : profile + ".DIF"}) + + ps_set(d["specific"][j], tr[component], ps, parms) + + +def translate_policy(difconf, path, ps, parms): + if path =='efcp.*.dtcp': + dtcp_ps_set(difconf, ps, parms) + + elif path == 'efcp.*.dtp': + dtp_ps_set(difconf, ps, parms) + + elif is_security_path(path): + translate_security_path(difconf, path, ps, parms) + + else: + policy_translator[path](difconf, ps, parms) + diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index 3573554..d3e1698 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -2,6 +2,7 @@ # QEMU testbed for Rumba # # Vincenzo Maffione <v.maffione@nextworks.it> +# Marco Capitani <m.capitani@nextworks.it> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -23,10 +24,12 @@ import subprocess import os import rumba.model as mod +from rumba import ssh_support class Testbed(mod.Testbed): - def __init__(self, exp_name, username, bzimage, initramfs, proj_name="ARCFIRE", password="", + def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE", + password="root", username="root", use_vhost=True, qemu_logs_dir=None): mod.Testbed.__init__(self, exp_name, username, password, proj_name) self.vms = {} @@ -34,15 +37,18 @@ class Testbed(mod.Testbed): self.bzimage = bzimage self.initramfs = initramfs self.vhost = use_vhost - self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir + self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None \ + else qemu_logs_dir self.boot_processes = [] @staticmethod - def _run_command_chain(commands, results_queue, error_queue, ignore_errors=False): + def _run_command_chain(commands, results_queue, + error_queue, ignore_errors=False): """ Runs (sequentially) the command list. - On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty. + On error, breaks and dumps it in error_queue, and interrupts + as soon as it is non-empty (unless ignore errors is True). :type commands: list :type results_queue: Queue @@ -69,6 +75,41 @@ class Testbed(mod.Testbed): else: results_queue.put("Command chain ran with %d errors" % errors) + def recover_if_names(self, experiment): + for node in experiment.nodes: + for ipcp in node.ipcps: + if isinstance(ipcp, mod.ShimEthIPCP): + shim_name, node_name = ipcp.name.split('.') + port_set = [x for x in self.vms[node_name]['ports'] + if x['shim'].name == shim_name] + port = port_set[0] + port_id = port['port_id'] + vm_id = self.vms[node_name]['id'] + mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id) + print('DEBUG: recovering ifname for port: ' + + port['tap_id'] + '.') + output = ssh_support.return_commands( + self, + node.ssh_config, + ['mac2ifname ' + mac]) + print('DEBUG: output is %s' % output) + if not hasattr(output, '__len__') or len(output) != 1: + raise Exception("Could not retrieve ifname for ipcp %s." + % ipcp.name) + ipcp.ifname = output[0] + args = {'vlan': int(port['shim'].name), 'port': ipcp.ifname} + cmds = ['ip link set %(port)s up' + % args, + 'ip link add link %(port)s name %(port)s.%(vlan)s ' + 'type vlan id %(vlan)s' + % args, + 'ip link set %(port)s.%(vlan)s up' + % args] + ssh_support.execute_commands(self, + node.ssh_config, + cmds) + + def swap_in(self, experiment): """ :type experiment mod.Experiment @@ -88,11 +129,12 @@ class Testbed(mod.Testbed): e_queue = multiprocessing.Queue() print(experiment.dif_ordering) for shim in experiment.dif_ordering: - command_list = [] if not isinstance(shim, mod.ShimEthDIF): # Nothing to do here continue self.shims.append(shim) + ipcps = shim.ipcps + command_list = [] command_list += ('sudo brctl addbr %(br)s\n' 'sudo ip link set %(br)s up' % {'br': shim.name} @@ -113,23 +155,39 @@ class Testbed(mod.Testbed): speed = '%dmbit' % shim.link_speed # Rate limit the traffic transmitted on the TAP interface - command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root ' - 'htb default 11\n' - 'sudo tc class add dev %(tap)s parent 1: classid ' - '1:1 htb rate 10gbit\n' - 'sudo tc class add dev %(tap)s parent 1:1 classid ' - '1:11 htb rate %(speed)s' - % {'tap': tap_id, 'speed': speed} - ).split('\n') - - vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id}) + command_list += ( + 'sudo tc qdisc add dev %(tap)s handle 1: root ' + 'htb default 11\n' + 'sudo tc class add dev %(tap)s parent 1: classid ' + '1:1 htb rate 10gbit\n' + 'sudo tc class add dev %(tap)s parent 1:1 classid ' + '1:11 htb rate %(speed)s' + % {'tap': tap_id, 'speed': speed} + ).split('\n') + + vm['ports'].append({'tap_id': tap_id, + 'shim': shim, + 'port_id': port_id}) + ipcp_set = [x for x in ipcps if x in node.ipcps] + if len(ipcp_set) > 1: + raise Exception("Error: more than one ipcp in common " + "between shim dif %s and node %s" + % (shim.name, node.name)) + ipcp = ipcp_set[0] # type: mod.ShimEthIPCP + assert ipcp.name == '%s.%s' % (shim.name, node.name), \ + 'Incorrect Shim Ipcp found: expected %s.%s, found %s' \ + % (shim.name, node.name, ipcp.name) + ipcp.ifname = tap_id # TODO deal with Ip address (shim UDP DIF). # Avoid stacking processes if one failed before. if not e_queue.empty(): break # Launch commands asynchronously - process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue)) + process = multiprocessing.Process(target=self._run_command_chain, + args=(command_list, + r_queue, + e_queue)) shim_processes.append(process) process.start() @@ -149,7 +207,8 @@ class Testbed(mod.Testbed): result = r_queue.get(timeout=1) if result == "Command chain ran correctly.": over_processes += 1 - print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes)) + print('DEBUG: %s of %s processes completed.' + % (over_processes, total_processes)) except: max_waiting_time -= 1 @@ -157,9 +216,9 @@ class Testbed(mod.Testbed): boot_batch_size = max(1, multiprocessing.cpu_count() // 2) booting_budget = boot_batch_size - boot_backoff = 12 # in seconds + boot_backoff = 12 # in seconds base_port = 2222 - vm_memory = 164 # in megabytes + vm_memory = 164 # in megabytes vm_frontend = 'virtio-net-pci' vmid = 1 @@ -167,6 +226,7 @@ class Testbed(mod.Testbed): for node in experiment.nodes: name = node.name vm = self.vms.setdefault(name, {'vm': node, 'ports': []}) + vm['id'] = vmid fwdp = base_port + vmid fwdc = fwdp + 10000 mac = '00:0a:0a:0a:%02x:%02x' % (vmid, 99) @@ -199,7 +259,6 @@ class Testbed(mod.Testbed): '-device %(frontend)s,mac=%(mac)s,netdev=mgmt ' '-netdev user,id=mgmt,%(hostfwdstr)s ' '-vga std ' - '-pidfile rina-%(id)s.pid ' '-serial file:%(vmname)s.log ' % vars_dict ) @@ -211,23 +270,29 @@ class Testbed(mod.Testbed): mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id']) port['mac'] = mac - command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s ' - '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,' - 'downscript=no%(vhost)s ' - % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'], - 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''} - ) + command += ( + '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s ' + '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,' + 'downscript=no%(vhost)s ' + % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'], + 'frontend': vm_frontend, + 'vhost': ',vhost=on' if self.vhost else ''} + ) booting_budget -= 1 if booting_budget <= 0: - print('Sleeping %s secs waiting for the VMs to boot' % boot_backoff) + + print('Sleeping %s secs waiting ' + 'for the VMs to boot' % boot_backoff) + time.sleep(boot_backoff) booting_budget = boot_batch_size - with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file: + with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w')\ + as out_file: print('DEBUG: executing >> %s' % command) - self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file)) - pass + self.boot_processes.append(subprocess.Popen(command.split(), + stdout=out_file)) vmid += 1 @@ -238,6 +303,7 @@ class Testbed(mod.Testbed): print('Sleeping %s secs waiting for the last VMs to boot' % tsleep) time.sleep(tsleep) + self.recover_if_names(experiment) def swap_out(self, experiment): """ @@ -267,9 +333,10 @@ class Testbed(mod.Testbed): 'sudo ip tuntap del mode tap name %(tap)s' % {'tap': tap, 'br': shim.name} ).split('\n') - process = multiprocessing.Process(target=self._run_command_chain, - args=(commands, results_queue, error_queue), - kwargs={'ignore_errors': True}) + process = multiprocessing.Process( + target=self._run_command_chain, + args=(commands, results_queue, error_queue), + kwargs={'ignore_errors': True}) port_processes.append(process) process.start() @@ -280,14 +347,17 @@ class Testbed(mod.Testbed): while max_waiting_time > 0 and over_processes < total_processes: # Check for errors if not error_queue.empty(): - print('Failure while shutting down: %s' % str(error_queue.get())) + print( + 'Failure while shutting down: %s' % str(error_queue.get()) + ) over_processes += 1 try: # Check for results result = results_queue.get(timeout=1) if result == "Command chain ran correctly.": over_processes += 1 - print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes)) + print('DEBUG: %s of %s tear-down port processes completed.' + % (over_processes, total_processes)) except: max_waiting_time -= 1 @@ -302,7 +372,9 @@ class Testbed(mod.Testbed): % {'br': shim.name} ).split('\n') process = multiprocessing.Process(target=self._run_command_chain, - args=(commands, results_queue, error_queue), + args=(commands, + results_queue, + error_queue), kwargs={'ignore_errors': True}) shim_processes.append(process) process.start() @@ -314,13 +386,15 @@ class Testbed(mod.Testbed): while max_waiting_time > 0 and over_processes < total_processes: # Check for errors if not error_queue.empty(): - print('Failure while shutting down: %s' % str(error_queue.get())) + print('Failure while shutting down: %s' + % str(error_queue.get())) over_processes += 1 try: # Check for results result = results_queue.get(timeout=1) if result == "Command chain ran correctly.": over_processes += 1 - print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes)) + print('DEBUG: %s of %s tear-down shim processes completed.' + % (over_processes, total_processes)) except: max_waiting_time -= 1 diff --git a/tools/democonf2rumba.py b/tools/democonf2rumba.py index cda112c..d9ea8e7 100755 --- a/tools/democonf2rumba.py +++ b/tools/democonf2rumba.py @@ -202,22 +202,30 @@ if __name__ == '__main__': import rumba.testbeds.emulab as emulab test_class = emulab.Testbed testbed_args = {a.dest: getattr(args, a.dest) - for a in emulab_p._actions if a.dest != 'help'} + for a in emulab_p._actions + if a.dest != 'help' + and getattr(args, a.dest) is not None} elif args.testbed == 'jfed': import rumba.testbeds.jfed as jfed test_class = jfed.Testbed testbed_args = {a.dest: getattr(args, a.dest) - for a in jfed_p._actions if a.dest != 'help'} + for a in jfed_p._actions + if a.dest != 'help' + and getattr(args, a.dest) is not None} elif args.testbed == 'qemu': import rumba.testbeds.qemu as qemu test_class = qemu.Testbed testbed_args = {a.dest: getattr(args, a.dest) - for a in qemu_p._actions if a.dest != 'help'} + for a in qemu_p._actions + if a.dest != 'help' + and getattr(args, a.dest) is not None} elif args.testbed == 'fake': import rumba.testbeds.faketestbed as fake test_class = fake.Testbed testbed_args = {a.dest: getattr(args, a.dest) - for a in fake_p._actions if a.dest != 'help'} + for a in fake_p._actions + if a.dest != 'help' + and getattr(args, a.dest) is not None} else: if args.testbed is None: print('Testbed type must be specified!') |