aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-04-05 17:26:35 +0200
committerMarco Capitani <m.capitani@nextworks.it>2017-04-05 17:26:35 +0200
commitafc508dbf2c34eedb93d46dc43d5ec284213cbcb (patch)
treee5056dacd8bc2449cab8e59a19b71eb312b9721d
parent0018610178892296c23ee2ca72bf05b5b41c23b9 (diff)
downloadrumba-afc508dbf2c34eedb93d46dc43d5ec284213cbcb.tar.gz
rumba-afc508dbf2c34eedb93d46dc43d5ec284213cbcb.zip
Parallelization and splitting of the script
-rw-r--r--rumba/testbeds/qemu.py266
1 files changed, 189 insertions, 77 deletions
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 2f5491a..45e5cc0 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -17,69 +17,135 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301 USA
-import multiprocessing
+from Queue import Empty
+from multiprocessing import Process, Queue, cpu_count
+from os import geteuid
import rumba.model as mod
-from subprocess import Popen
+from subprocess import Popen, check_call, CalledProcessError
class Testbed(mod.Testbed):
- def __init__(self, exp_name, username, vm_img_folder, proj_name="ARCFIRE", password="", use_vhost=True):
+ def __init__(self, exp_name, username, vm_img_folder, proj_name="ARCFIRE", password="",
+ use_vhost=True, qemu_out_folder=""):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
self.vms = {}
self.shims = []
self.vm_img_path = vm_img_folder
self.vhost = use_vhost
+ self.qemu_folder = qemu_out_folder
+ self.boot_processes = []
+
+ @staticmethod
+ def _run_command_chain(commands, results_queue, error_queue):
+ """
+ Runs (sequentially) the command list.
+
+ On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.
+
+ :type commands: list
+ :type results_queue: Queue
+ :type error_queue: Queue
+ :param commands: list of commands to execute
+ :param results_queue: Queue of results of parallel processes
+ :param error_queue: Queue of error(s) encountered
+ :return: None
+ """
+ try:
+ for command in commands:
+ if not error_queue.empty():
+ break
+ print('DEBUG: executing >> {}'.format(command))
+ check_call(command.split())
+
+ results_queue.put("Command chain ran correctly.")
+ except CalledProcessError as e:
+ error_queue.put(str(e))
def create_experiment(self, experiment):
"""
:type experiment mod.Experiment
- :rtype str
:param experiment: The experiment running
- :return the script to be run to create the experiment's vms
"""
+ if geteuid() != 0:
+ print("ERROR: QEMU testbed requires root access: please use sudo.")
+ return # TODO I'd rather raise an appropriate error...
+
print("[QEMU testbed] swapping in")
- command_str = ""
# Building bridges and taps
-
+ shim_processes = []
+ r_queue = Queue()
+ e_queue = Queue()
+ print(experiment.dif_ordering)
for shim in experiment.dif_ordering:
+ command_list = []
if not isinstance(shim, mod.ShimEthDIF):
# Nothing to do here
continue
self.shims.append(shim)
- command_str += 'sudo brctl addbr %(br)s\n' \
- 'sudo ip link set %(br)s up\n' \
- '\n' % {'br': shim.name}
+ command_list += ('sudo brctl addbr %(br)s\n'
+ 'sudo ip link set %(br)s up'
+ % {'br': shim.name}
+ ).split('\n')
for node in shim.members: # type:mod.Node
name = node.name
vm = self.vms.setdefault(name, {'vm': node, 'ports': []})
port_id = len(vm['ports']) + 1
tap_id = '%s.%02x' % (name, port_id)
- command_str += 'sudo ip tuntap add mode tap name %(tap)s\n' \
- 'sudo ip link set %(tap)s up\n' \
- 'sudo brctl addif %(br)s %(tap)s\n\n' \
- % {'tap': tap_id, 'br': shim.name}
+ command_list += ('sudo ip tuntap add mode tap name %(tap)s\n'
+ 'sudo ip link set %(tap)s up\n'
+ 'sudo brctl addif %(br)s %(tap)s'
+ % {'tap': tap_id, 'br': shim.name}
+ ).split('\n')
if shim.link_speed > 0:
speed = '%dmbit' % shim.link_speed
# Rate limit the traffic transmitted on the TAP interface
- command_str += 'sudo tc qdisc add dev %(tap)s handle 1: root ' \
- 'htb default 11\n' \
- 'sudo tc class add dev %(tap)s parent 1: classid ' \
- '1:1 htb rate 10gbit\n' \
- 'sudo tc class add dev %(tap)s parent 1:1 classid ' \
- '1:11 htb rate %(speed)s\n' \
- % {'tap': tap_id, 'speed': speed}
+ command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
+ 'htb default 11\n'
+ 'sudo tc class add dev %(tap)s parent 1: classid '
+ '1:1 htb rate 10gbit\n'
+ 'sudo tc class add dev %(tap)s parent 1:1 classid '
+ '1:11 htb rate %(speed)s'
+ % {'tap': tap_id, 'speed': speed}
+ ).split('\n')
vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
# TODO deal with Ip address (shim UDP DIF).
+ # Avoid stacking processes if one failed before.
+ if not e_queue.empty():
+ break
+ # Launch commands asynchronously
+ process = Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+ shim_processes.append(process)
+ process.start()
+
+ # Wait for all processes to be over.
+ total_processes = len(shim_processes)
+ max_waiting_time = 2 * total_processes
+ over_processes = 0
+
+ while max_waiting_time > 0 and over_processes < total_processes:
+ # Check for errors
+ if not e_queue.empty():
+ print('Testbed instantiation failed: {}'.format(str(e_queue.get())))
+ return # TODO: again, I would prefer a specific exception
+ try:
+ # Check for results
+ result = r_queue.get(timeout=1)
+ if result == "Command chain ran correctly.":
+ over_processes += 1
+ print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
+ except Empty:
+ max_waiting_time -= 1
+
# Building vms
- boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
+ boot_batch_size = max(1, cpu_count() // 2)
booting_budget = boot_batch_size
boot_backoff = 12
base_port = 2222
@@ -105,22 +171,24 @@ class Testbed(mod.Testbed):
host_fwd_str = 'hostfwd=tcp::%(fwdp)s-:22' % vars_dict
vars_dict['hostfwdstr'] = host_fwd_str
- command_str += 'qemu-system-x86_64 '
+ command = 'qemu-system-x86_64 '
# TODO manage non default images
- command_str += '-kernel %(vmimgpath)s/bzImage ' \
- '-append "console=ttyS0" ' \
- '-initrd %(vmimgpath)s/rootfs.cpio ' % vars_dict
- command_str += '-nographic ' \
- '-display none ' \
- '--enable-kvm ' \
- '-smp 1 ' \
- '-m %(memory)sM ' \
- '-device %(frontend)s,mac=%(mac)s,netdev=mgmt ' \
- '-netdev user,id=mgmt,%(hostfwdstr)s ' \
- '-vga std ' \
- '-pidfile rina-%(id)s.pid ' \
- '-serial file:%(vmname)s.log ' \
- % vars_dict
+ command += ('-kernel %(vmimgpath)s/bzImage '
+ '-append "console=ttyS0" '
+ '-initrd %(vmimgpath)s/rootfs.cpio '
+ % vars_dict)
+ command += ('-nographic '
+ '-display none '
+ '--enable-kvm '
+ '-smp 1 '
+ '-m %(memory)sM '
+ '-device %(frontend)s,mac=%(mac)s,netdev=mgmt '
+ '-netdev user,id=mgmt,%(hostfwdstr)s '
+ '-vga std '
+ '-pidfile rina-%(id)s.pid '
+ '-serial file:%(vmname)s.log '
+ % vars_dict
+ )
del vars_dict
@@ -129,63 +197,107 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
port['mac'] = mac
- command_str += '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s ' \
- '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,' \
- 'downscript=no%(vhost)s ' \
- % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
- 'frontend': vm_frontend,
- 'vhost': ',vhost=on' if self.vhost else ''}
-
- command_str += '&\n'
+ command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
+ '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
+ 'downscript=no%(vhost)s '
+ % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
+ 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
+ )
booting_budget -= 1
if booting_budget <= 0:
- command_str += 'sleep %s\n' % boot_backoff
- booting_budget = boot_batch_size
+ pass # TODO manage the backoff time
+ # command_str += 'sleep %s\n' % boot_backoff
+ # booting_budget = boot_batch_size
+
+ with open(self.qemu_folder + '/qemu_out{}'.format(vmid), 'w') as out_file:
+ print('DEBUG: executing >> {}'.format(command))
+ self.boot_processes.append(Popen(command.split(), stdout=out_file))
+ pass
vmid += 1
- Popen([command_str], shell=True, executable="/bin/bash").wait() # TODO something more elegant maybe?
- return command_str
+ return
- def _make_down_script(self):
+ def __del__(self):
"""
:rtype str
:return: The script to tear down the experiment
"""
- command_str = 'kill_qemu() {\n' \
- ' PIDFILE=$1\n' \
- ' PID=$(cat $PIDFILE)\n' \
- ' if [ -n $PID ]; then\n' \
- ' kill $PID\n' \
- ' while [ -n "$(ps -p $PID -o comm=)" ]; do\n' \
- ' sleep 1\n' \
- ' done\n' \
- ' fi\n' \
- '\n' \
- ' rm $PIDFILE\n' \
- '}\n\n'
+ # TERM qemu processes
+ for process in self.boot_processes:
+ process.terminate()
- for vm_name, vm in self.vms.items():
- command_str += 'kill_qemu rina-%(id)s.pid\n' % {'id': vm['id']}
-
- command_str += '\n'
+ # Wait for them to shut down
+ for process in self.boot_processes:
+ process.wait()
+ port_processes = []
+ error_queue = Queue()
+ results_queue = Queue()
for vm_name, vm in self.vms.items():
for port in vm['ports']:
tap = port['tap_id']
shim = port['shim']
- command_str += 'sudo brctl delif %(br)s %(tap)s\n' \
- 'sudo ip link set %(tap)s down\n' \
- 'sudo ip tuntap del mode tap name %(tap)s\n\n' \
- % {'tap': tap, 'br': shim.name}
+ commands = []
+
+ commands += ('sudo brctl delif %(br)s %(tap)s\n'
+ 'sudo ip link set %(tap)s down\n'
+ 'sudo ip tuntap del mode tap name %(tap)s'
+ % {'tap': tap, 'br': shim.name}
+ ).split('\n')
+ process = Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
+ port_processes.append(process)
+ process.start()
+
+ total_processes = len(port_processes)
+ max_waiting_time = 2 * total_processes
+ over_processes = 0
+
+ while max_waiting_time > 0 and over_processes < total_processes:
+ # Check for errors
+ if not error_queue.empty():
+ print('Failure while shutting down: {}'.format(str(error_queue.get())))
+ over_processes += 1
+ try:
+ # Check for results
+ result = results_queue.get(timeout=1)
+ if result == "Command chain ran correctly.":
+ over_processes += 1
+ print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
+ except Empty:
+ max_waiting_time -= 1
+
+ error_queue = Queue()
+ results_queue = Queue()
+ shim_processes = []
for shim in self.shims:
- command_str += 'sudo ip link set %(br)s down\n' \
- 'sudo brctl delbr %(br)s\n' \
- '\n' % {'br': shim.name}
- return command_str
+ commands = []
+ commands += ('sudo ip link set %(br)s down\n'
+ 'sudo brctl delbr %(br)s'
+ % {'br': shim.name}
+ ).split('\n')
+ process = Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
+ shim_processes.append(process)
+ process.start()
- def __del__(self):
- script = self._make_down_script()
- Popen([script], shell=True, executable="/bin/bash")
+ total_processes = len(shim_processes)
+ max_waiting_time = 2 * total_processes
+ over_processes = 0
+
+ while max_waiting_time > 0 and over_processes < total_processes:
+ # Check for errors
+ if not error_queue.empty():
+ print('Failure while shutting down: {}'.format(str(error_queue.get())))
+ over_processes += 1
+ try:
+ # Check for results
+ result = results_queue.get(timeout=1)
+ if result == "Command chain ran correctly.":
+ over_processes += 1
+ print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
+ except Empty:
+ max_waiting_time -= 1
+
+ return