From afc508dbf2c34eedb93d46dc43d5ec284213cbcb Mon Sep 17 00:00:00 2001
From: Marco Capitani <m.capitani@nextworks.it>
Date: Wed, 5 Apr 2017 17:26:35 +0200
Subject: Parallelization and splitting of the script

---
 rumba/testbeds/qemu.py | 266 +++++++++++++++++++++++++++++++++++--------------
 1 file changed, 189 insertions(+), 77 deletions(-)

diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 2f5491a..45e5cc0 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -17,69 +17,135 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 # MA  02110-1301  USA
-import multiprocessing
+from Queue import Empty
+from multiprocessing import Process, Queue, cpu_count
+from os import geteuid
 
 import rumba.model as mod
-from subprocess import Popen
+from subprocess import Popen, check_call, CalledProcessError
 
 
 class Testbed(mod.Testbed):
-    def __init__(self, exp_name, username, vm_img_folder, proj_name="ARCFIRE", password="", use_vhost=True):
+    def __init__(self, exp_name, username, vm_img_folder, proj_name="ARCFIRE", password="",
+                 use_vhost=True, qemu_out_folder=""):
         mod.Testbed.__init__(self, exp_name, username, password, proj_name)
         self.vms = {}
         self.shims = []
         self.vm_img_path = vm_img_folder
         self.vhost = use_vhost
+        self.qemu_folder = qemu_out_folder
+        self.boot_processes = []
+
+    @staticmethod
+    def _run_command_chain(commands, results_queue, error_queue):
+        """
+        Runs (sequentially) the command list.
+
+        On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.
+
+        :type commands: list
+        :type results_queue: Queue
+        :type error_queue: Queue
+        :param commands: list of commands to execute
+        :param results_queue: Queue of results of parallel processes
+        :param error_queue: Queue of error(s) encountered
+        :return: None
+        """
+        try:
+            for command in commands:
+                if not error_queue.empty():
+                    break
+                print('DEBUG: executing >> {}'.format(command))
+                check_call(command.split())
+
+            results_queue.put("Command chain ran correctly.")
+        except CalledProcessError as e:
+            error_queue.put(str(e))
 
     def create_experiment(self, experiment):
         """
         :type experiment mod.Experiment
-        :rtype str
         :param experiment: The experiment running
-        :return the script to be run to create the experiment's vms
         """
+        if geteuid() != 0:
+            print("ERROR: QEMU testbed requires root access: please use sudo.")
+            return  # TODO I'd rather raise an appropriate error...
+
         print("[QEMU testbed] swapping in")
-        command_str = ""
 
         # Building bridges and taps
-
+        shim_processes = []
+        r_queue = Queue()
+        e_queue = Queue()
+        print(experiment.dif_ordering)
         for shim in experiment.dif_ordering:
+            command_list = []
             if not isinstance(shim, mod.ShimEthDIF):
                 # Nothing to do here
                 continue
             self.shims.append(shim)
-            command_str += 'sudo brctl addbr %(br)s\n' \
-                           'sudo ip link set %(br)s up\n' \
-                           '\n' % {'br': shim.name}
+            command_list += ('sudo brctl addbr %(br)s\n'
+                             'sudo ip link set %(br)s up'
+                             % {'br': shim.name}
+                             ).split('\n')
             for node in shim.members:  # type:mod.Node
                 name = node.name
                 vm = self.vms.setdefault(name, {'vm': node, 'ports': []})
                 port_id = len(vm['ports']) + 1
                 tap_id = '%s.%02x' % (name, port_id)
 
-                command_str += 'sudo ip tuntap add mode tap name %(tap)s\n' \
-                               'sudo ip link set %(tap)s up\n' \
-                               'sudo brctl addif %(br)s %(tap)s\n\n' \
-                               % {'tap': tap_id, 'br': shim.name}
+                command_list += ('sudo ip tuntap add mode tap name %(tap)s\n'
+                                 'sudo ip link set %(tap)s up\n'
+                                 'sudo brctl addif %(br)s %(tap)s'
+                                 % {'tap': tap_id, 'br': shim.name}
+                                 ).split('\n')
 
                 if shim.link_speed > 0:
                     speed = '%dmbit' % shim.link_speed
 
                     # Rate limit the traffic transmitted on the TAP interface
-                    command_str += 'sudo tc qdisc add dev %(tap)s handle 1: root ' \
-                                   'htb default 11\n' \
-                                   'sudo tc class add dev %(tap)s parent 1: classid ' \
-                                   '1:1 htb rate 10gbit\n' \
-                                   'sudo tc class add dev %(tap)s parent 1:1 classid ' \
-                                   '1:11 htb rate %(speed)s\n' \
-                                   % {'tap': tap_id, 'speed': speed}
+                    command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
+                                     'htb default 11\n'
+                                     'sudo tc class add dev %(tap)s parent 1: classid '
+                                     '1:1 htb rate 10gbit\n'
+                                     'sudo tc class add dev %(tap)s parent 1:1 classid '
+                                     '1:11 htb rate %(speed)s'
+                                     % {'tap': tap_id, 'speed': speed}
+                                     ).split('\n')
 
                 vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
                 # TODO deal with Ip address (shim UDP DIF).
 
+            # Avoid stacking processes if one failed before.
+            if not e_queue.empty():
+                break
+            # Launch commands asynchronously
+            process = Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+            shim_processes.append(process)
+            process.start()
+
+        # Wait for all processes to be over.
+        total_processes = len(shim_processes)
+        max_waiting_time = 2 * total_processes
+        over_processes = 0
+
+        while max_waiting_time > 0 and over_processes < total_processes:
+            # Check for errors
+            if not e_queue.empty():
+                print('Testbed instantiation failed: {}'.format(str(e_queue.get())))
+                return  # TODO: again, I would prefer a specific exception
+            try:
+                # Check for results
+                result = r_queue.get(timeout=1)
+                if result == "Command chain ran correctly.":
+                    over_processes += 1
+                    print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
+            except Empty:
+                max_waiting_time -= 1
+
         # Building vms
 
-        boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
+        boot_batch_size = max(1, cpu_count() // 2)
         booting_budget = boot_batch_size
         boot_backoff = 12
         base_port = 2222
@@ -105,22 +171,24 @@ class Testbed(mod.Testbed):
             host_fwd_str = 'hostfwd=tcp::%(fwdp)s-:22' % vars_dict
             vars_dict['hostfwdstr'] = host_fwd_str
 
-            command_str += 'qemu-system-x86_64 '
+            command = 'qemu-system-x86_64 '
             # TODO manage non default images
-            command_str += '-kernel %(vmimgpath)s/bzImage ' \
-                           '-append "console=ttyS0" ' \
-                           '-initrd %(vmimgpath)s/rootfs.cpio ' % vars_dict
-            command_str += '-nographic ' \
-                           '-display none ' \
-                           '--enable-kvm ' \
-                           '-smp 1 ' \
-                           '-m %(memory)sM ' \
-                           '-device %(frontend)s,mac=%(mac)s,netdev=mgmt ' \
-                           '-netdev user,id=mgmt,%(hostfwdstr)s ' \
-                           '-vga std ' \
-                           '-pidfile rina-%(id)s.pid ' \
-                           '-serial file:%(vmname)s.log ' \
-                           % vars_dict
+            command += ('-kernel %(vmimgpath)s/bzImage '
+                        '-append "console=ttyS0" '
+                        '-initrd %(vmimgpath)s/rootfs.cpio '
+                        % vars_dict)
+            command += ('-nographic '
+                        '-display none '
+                        '--enable-kvm '
+                        '-smp 1 '
+                        '-m %(memory)sM '
+                        '-device %(frontend)s,mac=%(mac)s,netdev=mgmt '
+                        '-netdev user,id=mgmt,%(hostfwdstr)s '
+                        '-vga std '
+                        '-pidfile rina-%(id)s.pid '
+                        '-serial file:%(vmname)s.log '
+                        % vars_dict
+                        )
 
             del vars_dict
 
@@ -129,63 +197,107 @@ class Testbed(mod.Testbed):
                 mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
                 port['mac'] = mac
 
-                command_str += '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '          \
-                               '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'          \
-                               'downscript=no%(vhost)s '                                       \
-                    % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
-                       'frontend': vm_frontend,
-                       'vhost': ',vhost=on' if self.vhost else ''}
-
-            command_str += '&\n'
+                command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
+                            '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
+                            'downscript=no%(vhost)s '
+                            % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
+                               'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
+                            )
 
             booting_budget -= 1
             if booting_budget <= 0:
-                command_str += 'sleep %s\n' % boot_backoff
-                booting_budget = boot_batch_size
+                pass  # TODO manage the backoff time
+                # command_str += 'sleep %s\n' % boot_backoff
+                # booting_budget = boot_batch_size
+
+            with open(self.qemu_folder + '/qemu_out{}'.format(vmid), 'w') as out_file:
+                print('DEBUG: executing >> {}'.format(command))
+                self.boot_processes.append(Popen(command.split(), stdout=out_file))
+                pass
 
             vmid += 1
-        Popen([command_str], shell=True, executable="/bin/bash").wait()  # TODO something more elegant maybe?
-        return command_str
+        return
 
-    def _make_down_script(self):
+    def __del__(self):
         """
         :rtype str
         :return: The script to tear down the experiment
         """
-        command_str = 'kill_qemu() {\n' \
-                      '   PIDFILE=$1\n' \
-                      '   PID=$(cat $PIDFILE)\n' \
-                      '   if [ -n $PID ]; then\n' \
-                      '       kill $PID\n' \
-                      '       while [ -n "$(ps -p $PID -o comm=)" ]; do\n' \
-                      '           sleep 1\n' \
-                      '       done\n' \
-                      '   fi\n' \
-                      '\n' \
-                      '   rm $PIDFILE\n' \
-                      '}\n\n'
+        # TERM qemu processes
+        for process in self.boot_processes:
+            process.terminate()
 
-        for vm_name, vm in self.vms.items():
-            command_str += 'kill_qemu rina-%(id)s.pid\n' % {'id': vm['id']}
-
-        command_str += '\n'
+        # Wait for them to shut down
+        for process in self.boot_processes:
+            process.wait()
 
+        port_processes = []
+        error_queue = Queue()
+        results_queue = Queue()
         for vm_name, vm in self.vms.items():
             for port in vm['ports']:
                 tap = port['tap_id']
                 shim = port['shim']
 
-                command_str += 'sudo brctl delif %(br)s %(tap)s\n' \
-                               'sudo ip link set %(tap)s down\n' \
-                               'sudo ip tuntap del mode tap name %(tap)s\n\n' \
-                               % {'tap': tap, 'br': shim.name}
+                commands = []
+
+                commands += ('sudo brctl delif %(br)s %(tap)s\n'
+                             'sudo ip link set %(tap)s down\n'
+                             'sudo ip tuntap del mode tap name %(tap)s'
+                             % {'tap': tap, 'br': shim.name}
+                             ).split('\n')
+                process = Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
+                port_processes.append(process)
+                process.start()
+
+        total_processes = len(port_processes)
+        max_waiting_time = 2 * total_processes
+        over_processes = 0
+
+        while max_waiting_time > 0 and over_processes < total_processes:
+            # Check for errors
+            if not error_queue.empty():
+                print('Failure while shutting down: {}'.format(str(error_queue.get())))
+                over_processes += 1
+            try:
+                # Check for results
+                result = results_queue.get(timeout=1)
+                if result == "Command chain ran correctly.":
+                    over_processes += 1
+                    print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
+            except Empty:
+                max_waiting_time -= 1
+
+        error_queue = Queue()
+        results_queue = Queue()
+        shim_processes = []
 
         for shim in self.shims:
-            command_str += 'sudo ip link set %(br)s down\n' \
-                           'sudo brctl delbr %(br)s\n' \
-                           '\n' % {'br': shim.name}
-        return command_str
+            commands = []
+            commands += ('sudo ip link set %(br)s down\n'
+                         'sudo brctl delbr %(br)s'
+                         % {'br': shim.name}
+                         ).split('\n')
+            process = Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
+            shim_processes.append(process)
+            process.start()
 
-    def __del__(self):
-        script = self._make_down_script()
-        Popen([script], shell=True, executable="/bin/bash")
+        total_processes = len(shim_processes)
+        max_waiting_time = 2 * total_processes
+        over_processes = 0
+
+        while max_waiting_time > 0 and over_processes < total_processes:
+            # Check for errors
+            if not error_queue.empty():
+                print('Failure while shutting down: {}'.format(str(error_queue.get())))
+                over_processes += 1
+            try:
+                # Check for results
+                result = results_queue.get(timeout=1)
+                if result == "Command chain ran correctly.":
+                    over_processes += 1
+                    print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
+            except Empty:
+                max_waiting_time -= 1
+
+        return
-- 
cgit v1.2.3


From 420802234226546208328269793d2afb8cc6e4ec Mon Sep 17 00:00:00 2001
From: Marco Capitani <m.capitani@nextworks.it>
Date: Thu, 6 Apr 2017 12:46:47 +0200
Subject: Raising exceptions, authentication through getpass

---
 rumba/testbeds/qemu.py | 27 +++++++++++++++++----------
 1 file changed, 17 insertions(+), 10 deletions(-)

diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 45e5cc0..7dae921 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -18,8 +18,10 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 # MA  02110-1301  USA
 from Queue import Empty
+from getpass import getpass, getuser
 from multiprocessing import Process, Queue, cpu_count
 from os import geteuid
+from time import sleep
 
 import rumba.model as mod
 from subprocess import Popen, check_call, CalledProcessError
@@ -68,8 +70,15 @@ class Testbed(mod.Testbed):
         :param experiment: The experiment running
         """
         if geteuid() != 0:
-            print("ERROR: QEMU testbed requires root access: please use sudo.")
-            return  # TODO I'd rather raise an appropriate error...
+            pw = getpass('[sudo] password for {}:'.format(getuser()))
+            if '"' in pw or "'" in pw:
+                print('Illegal password: contains " or \'')
+                raise Exception('Not authenticated')
+            else:
+                try:
+                    check_call("sudo -v -p '{}'".format(pw).split())
+                except CalledProcessError:
+                    raise Exception('Not authenticated')
 
         print("[QEMU testbed] swapping in")
 
@@ -132,8 +141,9 @@ class Testbed(mod.Testbed):
         while max_waiting_time > 0 and over_processes < total_processes:
             # Check for errors
             if not e_queue.empty():
-                print('Testbed instantiation failed: {}'.format(str(e_queue.get())))
-                return  # TODO: again, I would prefer a specific exception
+                error_str = str(e_queue.get())
+                print('Testbed instantiation failed: {}'.format(error_str))
+                raise Exception('Failure: {}'.format(error_str))
             try:
                 # Check for results
                 result = r_queue.get(timeout=1)
@@ -206,9 +216,9 @@ class Testbed(mod.Testbed):
 
             booting_budget -= 1
             if booting_budget <= 0:
-                pass  # TODO manage the backoff time
-                # command_str += 'sleep %s\n' % boot_backoff
-                # booting_budget = boot_batch_size
+                print('Sleeping for {} seconds to give the machines time to boot up.'.format(boot_backoff))
+                sleep(boot_backoff)
+                booting_budget = boot_batch_size
 
             with open(self.qemu_folder + '/qemu_out{}'.format(vmid), 'w') as out_file:
                 print('DEBUG: executing >> {}'.format(command))
@@ -216,7 +226,6 @@ class Testbed(mod.Testbed):
                 pass
 
             vmid += 1
-        return
 
     def __del__(self):
         """
@@ -299,5 +308,3 @@ class Testbed(mod.Testbed):
                     print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
             except Empty:
                 max_waiting_time -= 1
-
-        return
-- 
cgit v1.2.3