aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-04-11 15:53:49 +0200
committerMarco Capitani <m.capitani@nextworks.it>2017-04-11 15:53:49 +0200
commit46310717c3293054324cc6a0271d855b638df0ff (patch)
tree69b27a68b2d97962f3bcbdee40d412453285eea2
parent7f5054816fc68bca1d9d4901d1e365b57a278542 (diff)
downloadrumba-46310717c3293054324cc6a0271d855b638df0ff.tar.gz
rumba-46310717c3293054324cc6a0271d855b638df0ff.zip
Resolving node_id issue and general cleanup
-rw-r--r--rumba/model.py31
-rw-r--r--rumba/prototypes/irati.py98
-rw-r--r--rumba/prototypes/irati_templates.py39
-rw-r--r--rumba/testbeds/qemu.py97
4 files changed, 157 insertions, 108 deletions
diff --git a/rumba/model.py b/rumba/model.py
index bab92bf..9f6d2c2 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -316,7 +316,7 @@ class Experiment:
:type testbed: Testbed
:param testbed: the testbed for the experiment
:param filename: name of the .conf file
- :return: the nodes list for the configuration file
+ :return: the Experiment
"""
shims = {}
@@ -337,7 +337,8 @@ class Experiment:
if line.startswith('#') or line == "":
continue
- m = re.match(r'\s*eth\s+([\w-]+)\s+(\d+)([GMK])bps\s+(\w.*)$', line)
+ m = re.match(r'\s*eth\s+([\w-]+)\s+(\d+)([GMK])bps\s+(\w.*)$',
+ line)
if m:
shim = m.group(1)
speed = int(m.group(2))
@@ -357,7 +358,10 @@ class Experiment:
shims[shim] = {'name': shim, 'speed': speed, 'type': 'eth'}
for vm in vm_list:
- nodes.setdefault(vm, {'name': vm, 'difs': [], 'dif_registrations': {}, 'registrations': {}})
+ nodes.setdefault(vm, {'name': vm,
+ 'difs': [],
+ 'dif_registrations': {},
+ 'registrations': {}})
nodes[vm]['difs'].append(shim)
continue
@@ -372,16 +376,21 @@ class Experiment:
% (line_cnt, dif))
continue
- difs.setdefault(dif, {'name': dif}) # Other dict contents might be policies.
+ difs.setdefault(dif, {'name': dif})
if vm in nodes and dif in nodes[vm]['dif_registrations']:
- print('Error: Line %d: vm %s in dif %s already specified'
- % (line_cnt, vm, dif))
+ print(
+ 'Error: Line %d: vm %s in dif %s already specified'
+ % (line_cnt, vm, dif))
continue
- nodes.setdefault(vm, {'name': vm, 'difs': [], 'dif_registrations': {}, 'registrations': {}})
+ nodes.setdefault(vm, {'name': vm,
+ 'difs': [],
+ 'dif_registrations': {},
+ 'registrations': {}})
nodes[vm]['difs'].append(dif)
- nodes[vm]['dif_registrations'][dif] = dif_list # It is not defined yet, per check above.
+ nodes[vm]['dif_registrations'][dif] = dif_list
+ # It is not defined yet, per check above.
continue
@@ -393,7 +402,8 @@ class Experiment:
parsed_difs = {}
for shim_name, shim in shims.items():
- parsed_difs[shim_name] = (ShimEthDIF(shim_name, link_speed=shim['speed']))
+ parsed_difs[shim_name] = (ShimEthDIF(shim_name,
+ link_speed=shim['speed']))
for dif_name, dif in difs.items():
parsed_difs[dif_name] = (NormalDIF(dif_name))
@@ -403,7 +413,8 @@ class Experiment:
name = node_data['name']
difs = [parsed_difs[x] for x in node_data['difs']]
dif_registrations = {parsed_difs[x]: [parsed_difs[y] for y in l]
- for x, l in node_data['dif_registrations'].items()}
+ for x, l
+ in node_data['dif_registrations'].items()}
parsed_nodes.append(Node(name, difs, dif_registrations))
return cls(testbed=testbed, nodes=parsed_nodes)
diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py
index c4c30c7..9c8b004 100644
--- a/rumba/prototypes/irati.py
+++ b/rumba/prototypes/irati.py
@@ -2,6 +2,7 @@
# Commands to setup and instruct IRATI
#
# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -62,10 +63,17 @@ class ConfBuilder(object):
:type experiment: Experiment
:param experiment: the experiment to be configured
"""
+ # Constants and initializations
ipcmconfs = dict()
+ difconfs = dict()
+ ipcp2shim_map = {}
+ node2id_map = {}
+ manager = False
+ mgmt_dif_name = 'NMS'
# TODO ask: what are these, and how are they represented in experiment?
- # Are these bindings or registration (in node)? In gen.py these are given on a per-dif basis...
+ # Are these bindings or registration (in node)?
+ # In gen.py these are given on a per-dif basis...
app_mappings = []
# If some app directives were specified, use those to build da.map.
@@ -73,20 +81,21 @@ class ConfBuilder(object):
# the DIF with the highest rank.
if len(app_mappings) == 0:
if len(experiment.dif_ordering) > 0:
- for adm in irati_templates.da_map_base["applicationToDIFMappings"]:
+ for adm in \
+ irati_templates.da_map_base["applicationToDIFMappings"]:
adm["difName"] = "%s" % (experiment.dif_ordering[-1],)
# else:
# irati_templates.da_map_base["applicationToDIFMappings"] = []
# for apm in app_mappings:
- # irati_templates.da_map_base["applicationToDIFMappings"].append({
+ # irati_templates.da_map_base["applicationToDIFMappings"]\
+ # .append({
# "encodedAppName": apm['name'],
# "difName": "%s.DIF" % (apm['dif'])
# })
- # TODO ask: I guess this will need to be added, and in that case we should add it to the qemu plugin too...
+ # TODO ask: I guess this will need to be added,
+ # and in that case we should add it to the qemu plugin too...
# Where should we take it in input?
- manager = False
- mgmt_dif_name = 'NMS'
if manager:
# Add MAD/Manager configuration
@@ -101,20 +110,24 @@ class ConfBuilder(object):
}
}
+ node_number = 1
for node in experiment.nodes: # type: mod.Node
+ node2id_map[node.name] = node_number
+ node_number += 1
ipcmconfs[node.name] = copy.deepcopy(irati_templates.ipcmconf_base)
if manager:
- ipcmconfs[node.name]["addons"]["mad"]["managerAppName"] = "%s.mad-1--" % (node.name)
+ ipcmconfs[node.name]["addons"]["mad"]["managerAppName"] \
+ = "%s.mad-1--" % (node.name,)
- difconfs = dict()
- ipcp2shim_map = {} # We will need it in a sec
for dif in experiment.dif_ordering: # type: mod.DIF
if isinstance(dif, mod.ShimEthDIF):
ipcp2shim_map.update({ipcp.name: dif for ipcp in dif.ipcps})
elif isinstance(dif, mod.NormalDIF):
difconfs[dif.name] = dict()
for node in dif.members:
- difconfs[dif.name][node.name] = copy.deepcopy(irati_templates.normal_dif_base)
+ difconfs[dif.name][node.name] = copy.deepcopy(
+ irati_templates.normal_dif_base
+ )
for node in experiment.nodes: # type: mod.Node
ipcmconf = ipcmconfs[node.name]
@@ -129,27 +142,29 @@ class ConfBuilder(object):
"difName": shim.name
})
- template_file_name = 'shimeth.%s.%s.dif' % (node_name, shim.name)
+ template_file_name = 'shimeth.%s.%s.dif' \
+ % (node_name, shim.name)
ipcmconf["difConfigurations"].append({
"name": shim.name,
"template": template_file_name
})
fout = open(template_file_name, 'w')
- fout.write(json.dumps({"difType": "shim-eth-vlan",
- "configParameters": {
- "interface-name": "ifc%d" % (int(port_id),)
- }
- },
- indent=4, sort_keys=True))
+ fout.write(json.dumps(
+ {"difType": "shim-eth-vlan",
+ "configParameters": {
+ "interface-name": "ifc%d" % (int(port_id),)
+ }
+ },
+ indent=4, sort_keys=True))
fout.close()
# Run over dif_ordering array, to make sure each IPCM config has
# the correct ordering for the ipcProcessesToCreate list of operations.
# If we iterated over the difs map, the order would be randomic, and so
- # some IPCP registrations in lower DIFs may fail. This would happen because
- # at the moment of registration, it may be that the IPCP of the lower DIF
- # has not been created yet.
+ # some IPCP registrations in lower DIFs may fail.
+ # This would happen because at the moment of registration,
+ # it may be that the IPCP of the lower DIF has not been created yet.
shims = ipcp2shim_map.values()
for dif in experiment.dif_ordering: # type: mod.NormalDIF
@@ -159,10 +174,8 @@ class ConfBuilder(object):
for node in dif.members: # type: mod.Node
node_name = node.name
- node_id = int(node.full_name.split(':')[1]) - 2222
ipcmconf = ipcmconfs[node_name]
- # TODO ask: here was vm['id']. Does the name work or does it have to be the id (sequential from 1)?
normal_ipcp = {"apName": "%s.%s.IPCP" % (dif.name, node_name),
"apInstance": "1",
"difName": "%s" % (dif.name,),
@@ -178,35 +191,44 @@ class ConfBuilder(object):
"template": "normal.%s.%s.dif" % (node_name, dif.name,)
})
- # Fill in the map of IPCP addresses. This could be moved at difconfs
- # deepcopy-time
- # TODO what to do for id? Get it from full_name (i.e.: address:port)? Ugly, but might work
+ # Fill in the map of IPCP addresses.
+ # This could be moved at difconfs
for other_node in dif.members: # type: mod.Node
- difconfs[dif.name][other_node.name]["knownIPCProcessAddresses"].append({
- "apName": "%s.%s.IPCP" % (dif.name, node_name),
- "apInstance": "1",
- "address": 16 + node_id
- })
+ difconfs[dif.name][other_node.name] \
+ ["knownIPCProcessAddresses"].append({
+ "apName": "%s.%s.IPCP" % (dif.name, node_name),
+ "apInstance": "1",
+ "address": 16 + node2id_map[node_name]})
for path, ps in dif.policies.items():
# if policy['nodes'] == [] or vmname in policy['nodes']:
- # TODO: policies can be applied per-node (and not just per-dif)? With what syntax?
- irati_templates.translate_policy(difconfs[dif.name][node_name], path,
- ps, parms=[])
- # TODO: what is the syntax for the policy parameters?
+ # TODO: policies can be applied per-node
+ # (and not just per-dif) in rumba? With what syntax?
+ irati_templates.translate_policy(
+ difconfs[dif.name][node_name], path, ps, parms=[])
# Dump the DIF Allocator map
with open('da.map', 'w') as da_map_file:
- json.dump(irati_templates.da_map_base, da_map_file, indent=4, sort_keys=True)
+ json.dump(irati_templates.da_map_base,
+ da_map_file,
+ indent=4,
+ sort_keys=True)
for node in experiment.nodes:
# Dump the IPCM configuration files
with open('%s.ipcm.conf' % (node.name,), 'w') as node_file:
- json.dump(ipcmconfs[node.name], node_file, indent=4, sort_keys=True)
+ json.dump(ipcmconfs[node.name],
+ node_file,
+ indent=4,
+ sort_keys=True)
for dif in experiment.dif_ordering: # type: mod.DIF
dif_conf = difconfs.get(dif.name, None)
if dif_conf:
# Dump the normal DIF configuration files
for node in dif.members:
- with open('normal.%s.%s.dif' % (node.name, dif.name), 'w') as dif_conf_file:
- json.dump(dif_conf[node.name], dif_conf_file, indent=4, sort_keys=True)
+ with open('normal.%s.%s.dif' % (node.name, dif.name), 'w') \
+ as dif_conf_file:
+ json.dump(dif_conf[node.name],
+ dif_conf_file,
+ indent=4,
+ sort_keys=True)
diff --git a/rumba/prototypes/irati_templates.py b/rumba/prototypes/irati_templates.py
index 9b03abb..0f3ef05 100644
--- a/rumba/prototypes/irati_templates.py
+++ b/rumba/prototypes/irati_templates.py
@@ -1,21 +1,3 @@
-#
-# Copyright (C) 2014-2017 Nextworks
-# Author: Vincenzo Maffione <v.maffione@nextworks.it>
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License as published by the Free Software Foundation; either
-# version 2.1 of the License, or (at your option) any later version.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
-
# Environment setup for VMs. Standard linux approach
env_dict = {'installpath': '/usr', 'varpath': ''}
@@ -274,14 +256,20 @@ def dtcp_ps_set(d, v, parms):
policy_translator = {
- 'rmt.pff': lambda d, v, p: ps_set(d["rmtConfiguration"]["pffConfiguration"], "policySet", v, p),
+ 'rmt.pff': lambda d, v, p: ps_set(d["rmtConfiguration"]["pffConfiguration"],
+ "policySet", v, p),
'rmt': lambda d, v, p: ps_set(d["rmtConfiguration"], "policySet", v, p),
- 'enrollment-task': lambda d, v, p: ps_set(d["enrollmentTaskConfiguration"], "policySet", v, p),
- 'flow-allocator': lambda d, v, p: ps_set(d["flowAllocatorConfiguration"], "policySet", v, p),
- 'namespace-manager': lambda d, v, p: ps_set(d["namespaceManagerConfiguration"], "policySet", v, p),
- 'security-manager': lambda d, v, p: ps_set(d["securityManagerConfiguration"], "policySet", v, p),
+ 'enrollment-task': lambda d, v, p: ps_set(d["enrollmentTaskConfiguration"],
+ "policySet", v, p),
+ 'flow-allocator': lambda d, v, p: ps_set(d["flowAllocatorConfiguration"],
+ "policySet", v, p),
+ 'namespace-manager': lambda d, v, p: ps_set(
+ d["namespaceManagerConfiguration"], "policySet", v, p),
+ 'security-manager': lambda d, v, p: ps_set(
+ d["securityManagerConfiguration"], "policySet", v, p),
'routing': lambda d, v, p: ps_set(d["routingConfiguration"], "policySet", v, p),
- 'resource-allocator.pduftg': lambda d, v, p: ps_set(d["resourceAllocatorConfiguration"], "policySet", v, p),
+ 'resource-allocator.pduftg': lambda d, v, p: ps_set(
+ d["resourceAllocatorConfiguration"], "policySet", v, p),
'efcp.*.dtcp': None,
'efcp.*.dtp': None,
}
@@ -289,7 +277,8 @@ policy_translator = {
def is_security_path(path):
sp = path.split('.')
- return (len(sp) == 3) and (sp[0] == 'security-manager') and (sp[1] in ['auth', 'encrypt', 'ttl', 'errorcheck'])
+ return (len(sp) == 3) and (sp[0] == 'security-manager') \
+ and (sp[1] in ['auth', 'encrypt', 'ttl', 'errorcheck'])
# Do we know this path ?
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 220becc..73a4f14 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -2,6 +2,7 @@
# QEMU testbed for Rumba
#
# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -26,7 +27,8 @@ import rumba.model as mod
class Testbed(mod.Testbed):
- def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE", password="root", username="root",
+ def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE",
+ password="root", username="root",
use_vhost=True, qemu_logs_dir=None):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
self.vms = {}
@@ -34,15 +36,18 @@ class Testbed(mod.Testbed):
self.bzimage = bzimage
self.initramfs = initramfs
self.vhost = use_vhost
- self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir
+ self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None \
+ else qemu_logs_dir
self.boot_processes = []
@staticmethod
- def _run_command_chain(commands, results_queue, error_queue, ignore_errors=False):
+ def _run_command_chain(commands, results_queue,
+ error_queue, ignore_errors=False):
"""
Runs (sequentially) the command list.
- On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.
+ On error, breaks and dumps it in error_queue, and interrupts
+ as soon as it is non-empty (unless ignore errors is True).
:type commands: list
:type results_queue: Queue
@@ -114,23 +119,28 @@ class Testbed(mod.Testbed):
speed = '%dmbit' % shim.link_speed
# Rate limit the traffic transmitted on the TAP interface
- command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
- 'htb default 11\n'
- 'sudo tc class add dev %(tap)s parent 1: classid '
- '1:1 htb rate 10gbit\n'
- 'sudo tc class add dev %(tap)s parent 1:1 classid '
- '1:11 htb rate %(speed)s'
- % {'tap': tap_id, 'speed': speed}
- ).split('\n')
-
- vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
+ command_list += (
+ 'sudo tc qdisc add dev %(tap)s handle 1: root '
+ 'htb default 11\n'
+ 'sudo tc class add dev %(tap)s parent 1: classid '
+ '1:1 htb rate 10gbit\n'
+ 'sudo tc class add dev %(tap)s parent 1:1 classid '
+ '1:11 htb rate %(speed)s'
+ % {'tap': tap_id, 'speed': speed}
+ ).split('\n')
+
+ vm['ports'].append({'tap_id': tap_id,
+ 'shim': shim,
+ 'port_id': port_id})
ipcp_set = [x for x in ipcps if x in node.ipcps]
if len(ipcp_set) > 1:
- raise Exception("Error: more than one ipcp in common between shim dif %s and node %s"
+ raise Exception("Error: more than one ipcp in common "
+ "between shim dif %s and node %s"
% (shim.name, node.name))
ipcp = ipcp_set[0] # type: mod.ShimEthIPCP
assert ipcp.name == '%s.%s' % (shim.name, node.name), \
- 'Incorrect Shim Ipcp found: expected %s.%s, found %s' % (shim.name, node.name, ipcp.name)
+ 'Incorrect Shim Ipcp found: expected %s.%s, found %s' \
+ % (shim.name, node.name, ipcp.name)
ipcp.ifname = tap_id
# TODO deal with Ip address (shim UDP DIF).
@@ -138,7 +148,10 @@ class Testbed(mod.Testbed):
if not e_queue.empty():
break
# Launch commands asynchronously
- process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+ process = multiprocessing.Process(target=self._run_command_chain,
+ args=(command_list,
+ r_queue,
+ e_queue))
shim_processes.append(process)
process.start()
@@ -158,7 +171,8 @@ class Testbed(mod.Testbed):
result = r_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -218,22 +232,27 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
port['mac'] = mac
- command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
- '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
- 'downscript=no%(vhost)s '
- % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
- 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
- )
+ command += (
+ '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
+ '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
+ 'downscript=no%(vhost)s '
+ % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
+ 'frontend': vm_frontend,
+ 'vhost': ',vhost=on' if self.vhost else ''}
+ )
booting_budget -= 1
if booting_budget <= 0:
- print('Sleeping for %s seconds to give the machines time to boot up.' % boot_backoff)
+ print('Sleeping for %s seconds to give '
+ 'the machines time to boot up.' % boot_backoff)
time.sleep(boot_backoff)
booting_budget = boot_batch_size
- with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file:
+ with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w')\
+ as out_file:
print('DEBUG: executing >> %s' % command)
- self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file))
+ self.boot_processes.append(subprocess.Popen(command.split(),
+ stdout=out_file))
vmid += 1
@@ -265,9 +284,10 @@ class Testbed(mod.Testbed):
'sudo ip tuntap del mode tap name %(tap)s'
% {'tap': tap, 'br': shim.name}
).split('\n')
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True})
+ process = multiprocessing.Process(
+ target=self._run_command_chain,
+ args=(commands, results_queue, error_queue),
+ kwargs={'ignore_errors': True})
port_processes.append(process)
process.start()
@@ -278,14 +298,17 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print(
+ 'Failure while shutting down: %s' % str(error_queue.get())
+ )
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down port processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -300,7 +323,9 @@ class Testbed(mod.Testbed):
% {'br': shim.name}
).split('\n')
process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
+ args=(commands,
+ results_queue,
+ error_queue),
kwargs={'ignore_errors': True})
shim_processes.append(process)
process.start()
@@ -312,13 +337,15 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print('Failure while shutting down: %s'
+ % str(error_queue.get()))
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down shim processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1