aboutsummaryrefslogtreecommitdiff
path: root/rumba/testbeds/qemu.py
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-04-11 15:53:49 +0200
committerMarco Capitani <m.capitani@nextworks.it>2017-04-11 15:53:49 +0200
commit46310717c3293054324cc6a0271d855b638df0ff (patch)
tree69b27a68b2d97962f3bcbdee40d412453285eea2 /rumba/testbeds/qemu.py
parent7f5054816fc68bca1d9d4901d1e365b57a278542 (diff)
downloadrumba-46310717c3293054324cc6a0271d855b638df0ff.tar.gz
rumba-46310717c3293054324cc6a0271d855b638df0ff.zip
Resolving node_id issue and general cleanup
Diffstat (limited to 'rumba/testbeds/qemu.py')
-rw-r--r--rumba/testbeds/qemu.py97
1 files changed, 62 insertions, 35 deletions
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 220becc..73a4f14 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -2,6 +2,7 @@
# QEMU testbed for Rumba
#
# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -26,7 +27,8 @@ import rumba.model as mod
class Testbed(mod.Testbed):
- def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE", password="root", username="root",
+ def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE",
+ password="root", username="root",
use_vhost=True, qemu_logs_dir=None):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
self.vms = {}
@@ -34,15 +36,18 @@ class Testbed(mod.Testbed):
self.bzimage = bzimage
self.initramfs = initramfs
self.vhost = use_vhost
- self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir
+ self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None \
+ else qemu_logs_dir
self.boot_processes = []
@staticmethod
- def _run_command_chain(commands, results_queue, error_queue, ignore_errors=False):
+ def _run_command_chain(commands, results_queue,
+ error_queue, ignore_errors=False):
"""
Runs (sequentially) the command list.
- On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.
+ On error, breaks and dumps it in error_queue, and interrupts
+ as soon as it is non-empty (unless ignore errors is True).
:type commands: list
:type results_queue: Queue
@@ -114,23 +119,28 @@ class Testbed(mod.Testbed):
speed = '%dmbit' % shim.link_speed
# Rate limit the traffic transmitted on the TAP interface
- command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
- 'htb default 11\n'
- 'sudo tc class add dev %(tap)s parent 1: classid '
- '1:1 htb rate 10gbit\n'
- 'sudo tc class add dev %(tap)s parent 1:1 classid '
- '1:11 htb rate %(speed)s'
- % {'tap': tap_id, 'speed': speed}
- ).split('\n')
-
- vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
+ command_list += (
+ 'sudo tc qdisc add dev %(tap)s handle 1: root '
+ 'htb default 11\n'
+ 'sudo tc class add dev %(tap)s parent 1: classid '
+ '1:1 htb rate 10gbit\n'
+ 'sudo tc class add dev %(tap)s parent 1:1 classid '
+ '1:11 htb rate %(speed)s'
+ % {'tap': tap_id, 'speed': speed}
+ ).split('\n')
+
+ vm['ports'].append({'tap_id': tap_id,
+ 'shim': shim,
+ 'port_id': port_id})
ipcp_set = [x for x in ipcps if x in node.ipcps]
if len(ipcp_set) > 1:
- raise Exception("Error: more than one ipcp in common between shim dif %s and node %s"
+ raise Exception("Error: more than one ipcp in common "
+ "between shim dif %s and node %s"
% (shim.name, node.name))
ipcp = ipcp_set[0] # type: mod.ShimEthIPCP
assert ipcp.name == '%s.%s' % (shim.name, node.name), \
- 'Incorrect Shim Ipcp found: expected %s.%s, found %s' % (shim.name, node.name, ipcp.name)
+ 'Incorrect Shim Ipcp found: expected %s.%s, found %s' \
+ % (shim.name, node.name, ipcp.name)
ipcp.ifname = tap_id
# TODO deal with Ip address (shim UDP DIF).
@@ -138,7 +148,10 @@ class Testbed(mod.Testbed):
if not e_queue.empty():
break
# Launch commands asynchronously
- process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+ process = multiprocessing.Process(target=self._run_command_chain,
+ args=(command_list,
+ r_queue,
+ e_queue))
shim_processes.append(process)
process.start()
@@ -158,7 +171,8 @@ class Testbed(mod.Testbed):
result = r_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -218,22 +232,27 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
port['mac'] = mac
- command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
- '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
- 'downscript=no%(vhost)s '
- % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
- 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
- )
+ command += (
+ '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
+ '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
+ 'downscript=no%(vhost)s '
+ % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
+ 'frontend': vm_frontend,
+ 'vhost': ',vhost=on' if self.vhost else ''}
+ )
booting_budget -= 1
if booting_budget <= 0:
- print('Sleeping for %s seconds to give the machines time to boot up.' % boot_backoff)
+ print('Sleeping for %s seconds to give '
+ 'the machines time to boot up.' % boot_backoff)
time.sleep(boot_backoff)
booting_budget = boot_batch_size
- with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file:
+ with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w')\
+ as out_file:
print('DEBUG: executing >> %s' % command)
- self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file))
+ self.boot_processes.append(subprocess.Popen(command.split(),
+ stdout=out_file))
vmid += 1
@@ -265,9 +284,10 @@ class Testbed(mod.Testbed):
'sudo ip tuntap del mode tap name %(tap)s'
% {'tap': tap, 'br': shim.name}
).split('\n')
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True})
+ process = multiprocessing.Process(
+ target=self._run_command_chain,
+ args=(commands, results_queue, error_queue),
+ kwargs={'ignore_errors': True})
port_processes.append(process)
process.start()
@@ -278,14 +298,17 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print(
+ 'Failure while shutting down: %s' % str(error_queue.get())
+ )
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down port processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1
@@ -300,7 +323,9 @@ class Testbed(mod.Testbed):
% {'br': shim.name}
).split('\n')
process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
+ args=(commands,
+ results_queue,
+ error_queue),
kwargs={'ignore_errors': True})
shim_processes.append(process)
process.start()
@@ -312,13 +337,15 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: %s' % str(error_queue.get()))
+ print('Failure while shutting down: %s'
+ % str(error_queue.get()))
over_processes += 1
try:
# Check for results
result = results_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
- print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
+ print('DEBUG: %s of %s tear-down shim processes completed.'
+ % (over_processes, total_processes))
except:
max_waiting_time -= 1