aboutsummaryrefslogtreecommitdiff
path: root/rumba/testbeds
diff options
context:
space:
mode:
authorvmaffione <v.maffione@gmail.com>2017-04-15 07:34:21 +0000
committervmaffione <v.maffione@gmail.com>2017-04-15 07:34:21 +0000
commit28d6a8729fac3d109e68afed1bbacc27d526048b (patch)
treecacdb7db6e44712c0a4ca2dc617afb355bc2c852 /rumba/testbeds
parentca1d77df271defb08d5f73b54398491d1049c9f9 (diff)
parent6eceae4bf7ee823d6eed276935741b7c107f6105 (diff)
downloadrumba-28d6a8729fac3d109e68afed1bbacc27d526048b.tar.gz
rumba-28d6a8729fac3d109e68afed1bbacc27d526048b.zip
Merge branch 'master-marco' into 'master'
IRATI config file generation (and other) See merge request !22
Diffstat (limited to 'rumba/testbeds')
-rw-r--r--rumba/testbeds/qemu.py150
1 files changed, 112 insertions, 38 deletions
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 3573554..174b860 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -2,6 +2,7 @@
# QEMU testbed for Rumba
#
# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -23,10 +24,12 @@ import subprocess
import os
import rumba.model as mod
+from rumba import ssh_support
class Testbed(mod.Testbed):
- def __init__(self, exp_name, username, bzimage, initramfs, proj_name="ARCFIRE", password="",
+ def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE",
+ password="root", username="root",
use_vhost=True, qemu_logs_dir=None):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
self.vms = {}
@@ -34,15 +37,18 @@ class Testbed(mod.Testbed):
self.bzimage = bzimage
self.initramfs = initramfs
self.vhost = use_vhost
- self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir
+ self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None \
+ else qemu_logs_dir
self.boot_processes = []
@staticmethod
- def _run_command_chain(commands, results_queue, error_queue, ignore_errors=False):
+ def _run_command_chain(commands, results_queue,
+ error_queue, ignore_errors=False):
"""
Runs (sequentially) the command list.
- On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.
+ On error, breaks and dumps it in error_queue, and interrupts
+ as soon as it is non-empty (unless ignore errors is True).
:type commands: list
:type results_queue: Queue
@@ -69,6 +75,37 @@ class Testbed(mod.Testbed):
else:
results_queue.put("Command chain ran with %d errors" % errors)
+ def recover_if_names(self, experiment):
+ next_vlan = 10
+ assigned_vlan = {}
+ for node in experiment.nodes:
+ for ipcp in node.ipcps:
+ if isinstance(ipcp, mod.ShimEthIPCP):
+ shim_name, node_name = ipcp.name.split('.')
+ port_set = [x for x in self.vms[node_name]['ports']
+ if x['shim'].name == shim_name]
+ port = port_set[0]
+ port_id = port['port_id']
+ vm_id = self.vms[node_name]['id']
+ mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id)
+ print('DEBUG: recovering ifname for port: '
+ + port['tap_id'] + '.')
+ output = ssh_support.execute_command(
+ self,
+ node.ssh_config,
+ 'mac2ifname ' + mac)
+ ipcp.ifname = output
+ try:
+ vlan = int(port['shim'].name)
+ except ValueError:
+ vlan = assigned_vlan.get(port['shim'].name, None)
+ if vlan is None:
+ vlan = next_vlan
+ next_vlan += 10
+ assigned_vlan[port['shim'].name] = vlan
+ ssh_support.setup_vlan(self, node,
+ vlan, ipcp.ifname)
+
def swap_in(self, experiment):
"""
:type experiment mod.Experiment
@@ -88,11 +125,12 @@ class Testbed(mod.Testbed):
e_queue = multiprocessing.Queue()
print(experiment.dif_ordering)
for shim in experiment.dif_ordering:
- command_list = []
if not isinstance(shim, mod.ShimEthDIF):
# Nothing to do here
continue
self.shims.append(shim)
+ ipcps = shim.ipcps
+ command_list = []
command_list += ('sudo brctl addbr %(br)s\n'
'sudo ip link set %(br)s up'
% {'br': shim.name}
@@ -113,23 +151,39 @@ class Testbed(mod.Testbed):
speed = '%dmbit' % shim.link_speed
# Rate limit the traffic transmitted on the TAP interface
- command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
- 'htb default 11\n'
- 'sudo tc class add dev %(tap)s parent 1: classid '
- '1:1 htb rate 10gbit\n'
- 'sudo tc class add dev %(tap)s parent 1:1 classid '
- '1:11 htb rate %(speed)s'
- % {'tap': tap_id, 'speed': speed}
- ).split('\n')
-
- vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
+ command_list += (
+ 'sudo tc qdisc add dev %(tap)s handle 1: root '
+ 'htb default 11\n'
+ 'sudo tc class add dev %(tap)s parent 1: classid '
+ '1:1 htb rate 10gbit\n'
+ 'sudo tc class add dev %(tap)s parent 1:1 classid '
+ '1:11 htb rate %(speed)s'
+ % {'tap': tap_id, 'speed': speed}
+ ).split('\n')
+
+ vm['ports'].append({'tap_id': tap_id,
+ 'shim': shim,
+ 'port_id': port_id})
+ ipcp_set = [x for x in ipcps if x in node.ipcps]
+ if len(ipcp_set) > 1:
+ raise Exception("Error: more than one ipcp in common "
+ "between shim dif %s and node %s"
+ % (shim.name, node.name))
+ ipcp = ipcp_set[0] # type: mod.ShimEthIPCP
+ assert ipcp.name == '%s.%s' % (shim.name, node.name), \
+ 'Incorrect Shim Ipcp found: expected %s.%s, found %s' \
+ % (shim.name, node.name, ipcp.name)
+ ipcp.ifname = tap_id
# TODO deal with Ip address (shim UDP DIF).
# Avoid stacking processes if one failed before.
if not e_queue.empty():
break
# Launch commands asynchronously
- process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+ process = multiprocessing.Process(target=self._run_command_chain,
+ args=(command_list,
+ r_queue,
+ e_queue))
shim_processes.append(process)
process.start()
@@ -149,7 +203,8 @@ class Testbed(mod.Testbed):
result = r_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -157,9 +212,9 @@ class Testbed(mod.Testbed):
boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
booting_budget = boot_batch_size
- boot_backoff = 12 # in seconds
+ boot_backoff = 12 # in seconds
base_port = 2222
- vm_memory = 164 # in megabytes
+ vm_memory = 164 # in megabytes
vm_frontend = 'virtio-net-pci'
vmid = 1
@@ -167,6 +222,7 @@ class Testbed(mod.Testbed):
for node in experiment.nodes:
name = node.name
vm = self.vms.setdefault(name, {'vm': node, 'ports': []})
+ vm['id'] = vmid
fwdp = base_port + vmid
fwdc = fwdp + 10000
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, 99)
@@ -199,7 +255,6 @@ class Testbed(mod.Testbed):
'-device %(frontend)s,mac=%(mac)s,netdev=mgmt '
'-netdev user,id=mgmt,%(hostfwdstr)s '
'-vga std '
- '-pidfile rina-%(id)s.pid '
'-serial file:%(vmname)s.log '
% vars_dict
)
@@ -211,23 +266,29 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
port['mac'] = mac
- command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
- '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
- 'downscript=no%(vhost)s '
- % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
- 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
- )
+ command += (
+ '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
+ '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
+ 'downscript=no%(vhost)s '
+ % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
+ 'frontend': vm_frontend,
+ 'vhost': ',vhost=on' if self.vhost else ''}
+ )
booting_budget -= 1
if booting_budget <= 0:
- print('Sleeping %s secs waiting for the VMs to boot' % boot_backoff)
+
+ print('Sleeping %s secs waiting '
+ 'for the VMs to boot' % boot_backoff)
+
time.sleep(boot_backoff)
booting_budget = boot_batch_size
- with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file:
+ with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w')\
+ as out_file:
print('DEBUG: executing >> %s' % command)
- self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file))
- pass
+ self.boot_processes.append(subprocess.Popen(command.split(),
+ stdout=out_file))
vmid += 1
@@ -238,6 +299,11 @@ class Testbed(mod.Testbed):
print('Sleeping %s secs waiting for the last VMs to boot' % tsleep)
time.sleep(tsleep)
+ # TODO: to be removed, we should loop in the ssh part
+ print('Sleeping 5 seconds, just to be on the safe side')
+ time.sleep(5)
+
+ self.recover_if_names(experiment)
def swap_out(self, experiment):
"""
@@ -267,9 +333,10 @@ class Testbed(mod.Testbed):
'sudo ip tuntap del mode tap name %(tap)s'
% {'tap': tap, 'br': shim.name}
).split('\n')
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True})
+ process = multiprocessing.Process(
+ target=self._run_command_chain,
+ args=(commands, results_queue, error_queue),
+ kwargs={'ignore_errors': True})
port_processes.append(process)
process.start()
@@ -280,14 +347,17 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print(
+ 'Failure while shutting down: %s' % str(error_queue.get())
+ )
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down port processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -302,7 +372,9 @@ class Testbed(mod.Testbed):
% {'br': shim.name}
).split('\n')
process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
+ args=(commands,
+ results_queue,
+ error_queue),
kwargs={'ignore_errors': True})
shim_processes.append(process)
process.start()
@@ -314,13 +386,15 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print('Failure while shutting down: %s'
+ % str(error_queue.get()))
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down shim processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1