aboutsummaryrefslogtreecommitdiff
path: root/rumba/testbeds
diff options
context:
space:
mode:
Diffstat (limited to 'rumba/testbeds')
-rw-r--r--rumba/testbeds/jfed.py5
-rw-r--r--rumba/testbeds/qemu.py160
2 files changed, 121 insertions, 44 deletions
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py
index 33d89f8..8ad173b 100644
--- a/rumba/testbeds/jfed.py
+++ b/rumba/testbeds/jfed.py
@@ -128,6 +128,11 @@ class Testbed(mod.Testbed):
auth_name_r = self.auth_name.replace(".", "-")
node.ssh_config.hostname = node.name + "." + self.exp_name + "." + \
auth_name_r + "." + self.auth_name
+ node.ssh_config.proxycommand = "ssh -i '" + self.cert_file + \
+ "' -o StrictHostKeyChecking=no " + \
+ self.username + \
+ "@bastion.test.iminds.be nc " + \
+ node.ssh_config.hostname + " 22"
subprocess.call(["java", "-jar", self.jfed_jar, "create", "-S", \
self.proj_name, "--rspec", \
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 3573554..d998625 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -2,6 +2,7 @@
# QEMU testbed for Rumba
#
# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -23,10 +24,12 @@ import subprocess
import os
import rumba.model as mod
+from rumba import ssh_support
class Testbed(mod.Testbed):
- def __init__(self, exp_name, username, bzimage, initramfs, proj_name="ARCFIRE", password="",
+ def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE",
+ password="root", username="root",
use_vhost=True, qemu_logs_dir=None):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
self.vms = {}
@@ -34,15 +37,18 @@ class Testbed(mod.Testbed):
self.bzimage = bzimage
self.initramfs = initramfs
self.vhost = use_vhost
- self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir
+ self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None \
+ else qemu_logs_dir
self.boot_processes = []
@staticmethod
- def _run_command_chain(commands, results_queue, error_queue, ignore_errors=False):
+ def _run_command_chain(commands, results_queue,
+ error_queue, ignore_errors=False):
"""
Runs (sequentially) the command list.
- On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.
+ On error, breaks and dumps it in error_queue, and interrupts
+ as soon as it is non-empty (unless ignore errors is True).
:type commands: list
:type results_queue: Queue
@@ -56,7 +62,7 @@ class Testbed(mod.Testbed):
for command in commands:
if not error_queue.empty() and not ignore_errors:
break
- print('DEBUG: executing >> %s' % command)
+ print('qemu: executing >> %s' % command)
try:
subprocess.check_call(command.split())
except subprocess.CalledProcessError as e:
@@ -69,6 +75,37 @@ class Testbed(mod.Testbed):
else:
results_queue.put("Command chain ran with %d errors" % errors)
+ def recover_if_names(self, experiment):
+ next_vlan = 10
+ assigned_vlan = {}
+ for node in experiment.nodes:
+ for ipcp in node.ipcps:
+ if isinstance(ipcp, mod.ShimEthIPCP):
+ shim_name, node_name = ipcp.name.split('.')
+ port_set = [x for x in self.vms[node_name]['ports']
+ if x['shim'].name == shim_name]
+ port = port_set[0]
+ port_id = port['port_id']
+ vm_id = self.vms[node_name]['id']
+ mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id)
+ print('qemu: recovering ifname for port: '
+ + port['tap_id'] + '.')
+ output = ssh_support.execute_command(
+ self,
+ node.ssh_config,
+ 'mac2ifname ' + mac)
+ ipcp.ifname = output
+ try:
+ vlan = int(port['shim'].name)
+ except ValueError:
+ vlan = assigned_vlan.get(port['shim'].name, None)
+ if vlan is None:
+ vlan = next_vlan
+ next_vlan += 10
+ assigned_vlan[port['shim'].name] = vlan
+ ssh_support.setup_vlan(self, node,
+ vlan, ipcp.ifname)
+
def swap_in(self, experiment):
"""
:type experiment mod.Experiment
@@ -80,19 +117,19 @@ class Testbed(mod.Testbed):
except subprocess.CalledProcessError:
raise Exception('Not authenticated')
- print("[QEMU testbed] swapping in")
+ print("qemu: swapping in")
# Building bridges and taps
shim_processes = []
r_queue = multiprocessing.Queue()
e_queue = multiprocessing.Queue()
- print(experiment.dif_ordering)
for shim in experiment.dif_ordering:
- command_list = []
if not isinstance(shim, mod.ShimEthDIF):
# Nothing to do here
continue
self.shims.append(shim)
+ ipcps = shim.ipcps
+ command_list = []
command_list += ('sudo brctl addbr %(br)s\n'
'sudo ip link set %(br)s up'
% {'br': shim.name}
@@ -113,23 +150,39 @@ class Testbed(mod.Testbed):
speed = '%dmbit' % shim.link_speed
# Rate limit the traffic transmitted on the TAP interface
- command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
- 'htb default 11\n'
- 'sudo tc class add dev %(tap)s parent 1: classid '
- '1:1 htb rate 10gbit\n'
- 'sudo tc class add dev %(tap)s parent 1:1 classid '
- '1:11 htb rate %(speed)s'
- % {'tap': tap_id, 'speed': speed}
- ).split('\n')
-
- vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
+ command_list += (
+ 'sudo tc qdisc add dev %(tap)s handle 1: root '
+ 'htb default 11\n'
+ 'sudo tc class add dev %(tap)s parent 1: classid '
+ '1:1 htb rate 10gbit\n'
+ 'sudo tc class add dev %(tap)s parent 1:1 classid '
+ '1:11 htb rate %(speed)s'
+ % {'tap': tap_id, 'speed': speed}
+ ).split('\n')
+
+ vm['ports'].append({'tap_id': tap_id,
+ 'shim': shim,
+ 'port_id': port_id})
+ ipcp_set = [x for x in ipcps if x in node.ipcps]
+ if len(ipcp_set) > 1:
+ raise Exception("Error: more than one ipcp in common "
+ "between shim dif %s and node %s"
+ % (shim.name, node.name))
+ ipcp = ipcp_set[0] # type: mod.ShimEthIPCP
+ assert ipcp.name == '%s.%s' % (shim.name, node.name), \
+ 'Incorrect Shim Ipcp found: expected %s.%s, found %s' \
+ % (shim.name, node.name, ipcp.name)
+ ipcp.ifname = tap_id
# TODO deal with Ip address (shim UDP DIF).
# Avoid stacking processes if one failed before.
if not e_queue.empty():
break
# Launch commands asynchronously
- process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+ process = multiprocessing.Process(target=self._run_command_chain,
+ args=(command_list,
+ r_queue,
+ e_queue))
shim_processes.append(process)
process.start()
@@ -142,14 +195,15 @@ class Testbed(mod.Testbed):
# Check for errors
if not e_queue.empty():
error_str = str(e_queue.get())
- print('Testbed instantiation failed: %s' % error_str)
+ print('qemu: Testbed instantiation failed: %s' % error_str)
raise Exception('Failure: %s' % error_str)
try:
# Check for results
result = r_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
+ print('qemu: %s of %s processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -157,9 +211,9 @@ class Testbed(mod.Testbed):
boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
booting_budget = boot_batch_size
- boot_backoff = 12 # in seconds
+ boot_backoff = 12 # in seconds
base_port = 2222
- vm_memory = 164 # in megabytes
+ vm_memory = 164 # in megabytes
vm_frontend = 'virtio-net-pci'
vmid = 1
@@ -167,6 +221,7 @@ class Testbed(mod.Testbed):
for node in experiment.nodes:
name = node.name
vm = self.vms.setdefault(name, {'vm': node, 'ports': []})
+ vm['id'] = vmid
fwdp = base_port + vmid
fwdc = fwdp + 10000
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, 99)
@@ -199,7 +254,6 @@ class Testbed(mod.Testbed):
'-device %(frontend)s,mac=%(mac)s,netdev=mgmt '
'-netdev user,id=mgmt,%(hostfwdstr)s '
'-vga std '
- '-pidfile rina-%(id)s.pid '
'-serial file:%(vmname)s.log '
% vars_dict
)
@@ -211,23 +265,29 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
port['mac'] = mac
- command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
- '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
- 'downscript=no%(vhost)s '
- % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
- 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
- )
+ command += (
+ '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
+ '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
+ 'downscript=no%(vhost)s '
+ % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
+ 'frontend': vm_frontend,
+ 'vhost': ',vhost=on' if self.vhost else ''}
+ )
booting_budget -= 1
if booting_budget <= 0:
- print('Sleeping %s secs waiting for the VMs to boot' % boot_backoff)
+
+ print('qemu: Sleeping %s secs waiting '
+ 'for the VMs to boot' % boot_backoff)
+
time.sleep(boot_backoff)
booting_budget = boot_batch_size
- with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file:
- print('DEBUG: executing >> %s' % command)
- self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file))
- pass
+ with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w')\
+ as out_file:
+ print('qemu: executing >> %s' % command)
+ self.boot_processes.append(subprocess.Popen(command.split(),
+ stdout=out_file))
vmid += 1
@@ -235,9 +295,14 @@ class Testbed(mod.Testbed):
if booting_budget < boot_backoff:
tsleep = boot_backoff * (boot_batch_size - booting_budget) / \
boot_batch_size
- print('Sleeping %s secs waiting for the last VMs to boot' % tsleep)
+ print('qemu: Sleeping %s secs waiting for the last VMs to boot' % tsleep)
time.sleep(tsleep)
+ # TODO: to be removed, we should loop in the ssh part
+ print('qemu: Sleeping 5 seconds, just to be on the safe side')
+ time.sleep(5)
+
+ self.recover_if_names(experiment)
def swap_out(self, experiment):
"""
@@ -267,9 +332,10 @@ class Testbed(mod.Testbed):
'sudo ip tuntap del mode tap name %(tap)s'
% {'tap': tap, 'br': shim.name}
).split('\n')
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True})
+ process = multiprocessing.Process(
+ target=self._run_command_chain,
+ args=(commands, results_queue, error_queue),
+ kwargs={'ignore_errors': True})
port_processes.append(process)
process.start()
@@ -280,14 +346,16 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print('qemu:Failure while shutting down: %s'\
+ % str(error_queue.get()))
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
+ print('qemu: %s of %s tear-down port processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -302,7 +370,9 @@ class Testbed(mod.Testbed):
% {'br': shim.name}
).split('\n')
process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
+ args=(commands,
+ results_queue,
+ error_queue),
kwargs={'ignore_errors': True})
shim_processes.append(process)
process.start()
@@ -314,13 +384,15 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print('qemu: Failure while shutting down: %s'
+ % str(error_queue.get()))
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
+ print('qemu: %s of %s tear-down shim processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1