diff options
author | Marco Capitani <m.capitani@nextworks.it> | 2017-11-07 11:40:54 +0100 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-11-15 18:59:11 +0100 |
commit | 2da15caf24a8b2da70d755e065a5dc3d770c6454 (patch) | |
tree | 292456db4acc4ab8f4afffad50113cc7e219b6cd /rumba/testbeds | |
parent | 2e91ca33f90f7c74887013e08c95bb00cdd4fc00 (diff) | |
download | rumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.tar.gz rumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.zip |
prototypes: Add parallel installation for IRATI
This adds parallel installation for IRATI, it also adds support for
multithread/multiprocess logging. Furthermore prototype-agnostic
utilities for multiprocessing have been added. Caching of clients has
been re-enabled for the ssh connections.
Diffstat (limited to 'rumba/testbeds')
-rw-r--r-- | rumba/testbeds/jfed.py | 2 | ||||
-rw-r--r-- | rumba/testbeds/qemu.py | 373 |
2 files changed, 230 insertions, 145 deletions
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index da93110..d6eb458 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -200,6 +200,8 @@ class Testbed(mod.Testbed): "-P", self.password]) def swap_in(self, experiment): + for node in experiment.nodes: + node.ssh_config.set_http_proxy(self.http_proxy) self.create_rspec(experiment) auth_name_r = self.auth_name.replace(".", "-") diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index e4a35ab..75a564c 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -22,21 +22,28 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., http://www.fsf.org/about/contact/. # - -import multiprocessing -import time -import subprocess +import multiprocessing as old_m +import multiprocessing.dummy as multiprocessing import os +import subprocess import sys +import time + +import rumba.model as mod +import rumba.log as log +import rumba.ssh_support as ssh_support +import rumba.multiprocessing_utils as m_processing +from rumba.multiprocessing_utils import ProcessContextManager if sys.version_info[0] >= 3: from urllib.request import urlretrieve else: from urllib import urlretrieve -import rumba.model as mod -import rumba.log as log -import rumba.ssh_support as ssh_support +if sys.version_info[0] >= 3: + import contextlib +else: + import contextlib2 as contextlib logger = log.get_logger(__name__) @@ -52,12 +59,13 @@ class Testbed(mod.Testbed): self.boot_processes = [] self.bzimage_path = bzimage_path self.initramfs_path = initramfs_path + self.multiproc_manager = None # Prepend sudo to all commands if the user is not 'root' def may_sudo(self, cmds): if os.geteuid() != 0: - for i in range(len(cmds)): - cmds[i] = "sudo %s" % cmds[i] + for i, cmd in enumerate(cmds): + cmds[i] = "sudo %s" % cmd @staticmethod def _run_command_chain(commands, results_queue, @@ -77,9 +85,11 @@ class Testbed(mod.Testbed): :return: None """ errors = 0 + # results_queue.cancel_join_thread() + # error_queue.cancel_join_thread() for command in commands: - if not error_queue.empty() and not ignore_errors: - break + # if not error_queue.empty() and not ignore_errors: + # break logger.debug('executing >> %s', command) try: subprocess.check_call(command.split()) @@ -88,12 +98,12 @@ class Testbed(mod.Testbed): errors += 1 if not ignore_errors: break - except KeyboardInterrupt as e: + except KeyboardInterrupt: error_queue.put('Interrupted') if errors == 0: results_queue.put("Command chain ran correctly") else: - results_queue.put("Command chain ran with %d errors" % errors) + results_queue.put("Command chain ran with %d error(s)" % errors) def recover_if_names(self, experiment): for node in experiment.nodes: @@ -161,10 +171,8 @@ class Testbed(mod.Testbed): logger.info('Setting up interfaces.') - # Building bridges and taps - shim_processes = [] - r_queue = multiprocessing.Queue() - e_queue = multiprocessing.Queue() + # Building bridges and taps initialization scripts + br_tab_scripts = [] for shim in experiment.dif_ordering: if not isinstance(shim, mod.ShimEthDIF): # Nothing to do here @@ -201,6 +209,7 @@ class Testbed(mod.Testbed): % {'tap': tap_id, 'speed': speed} ).split('\n') + # While we're at it, build vm ports table and ipcp table vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id}) @@ -216,63 +225,101 @@ class Testbed(mod.Testbed): ipcp.ifname = tap_id # TODO deal with Ip address (shim UDP DIF). - # Avoid stacking processes if one failed before. - if not e_queue.empty(): - break - # Launch commands asynchronously - self.may_sudo(command_list) - process = multiprocessing.Process(target=self._run_command_chain, - args=(command_list, - r_queue, - e_queue)) - shim_processes.append(process) - process.start() - - # Wait for all processes to be over. - total_processes = len(shim_processes) - max_waiting_time = 4 * total_processes - over_processes = 0 - - while over_processes < total_processes and max_waiting_time > 0: - # Check for deadlock - - # Check for errors - if not e_queue.empty(): - error_str = str(e_queue.get()) - logger.error('Testbed instantiation failed: %s', error_str) - - # Wait for the running processes to quit before swapping out - for process in shim_processes: - process.join() - - raise Exception('Failure: %s' % error_str) - try: - # Check for results - result = r_queue.get(timeout=1) - if result == "Command chain ran correctly": - over_processes += 1 - logger.debug('%s/%s processes completed', - over_processes, total_processes) - except: - max_waiting_time -= 1 - - if max_waiting_time == 0: - logger.error("Swap in is in deadlock, aborting.") - for process in shim_processes: - process.terminate() - time.sleep(0.1) - process.join() - raise Exception('Swap in is in deadlock') - - for process in shim_processes: - process.join() + br_tab_scripts.append(command_list) + ## + # End of shim/node parsing block + ## + # + + def executor(list_of_commands): + for cmd in list_of_commands: + logger.debug('executing >> %s', cmd) + subprocess.check_call(cmd.split()) + + names = [] + args = [] + executors = [] + for i, script in enumerate(br_tab_scripts): + names.append(i) + self.may_sudo(script) + args.append(script) + executors.append(executor) + + m_processing.call_in_parallel(names, args, executors) + + # self.multiproc_manager = multiprocessing.Manager() + # + # r_queue = self.multiproc_manager.Queue() + # e_queue = self.multiproc_manager.Queue() + # + # # This is a multi-context manager. It will close all processes + # # as soon as the interpreter exits the 'with' block + # with contextlib.ExitStack() as stack: + # total_processes = 0 + # for script in br_tab_scripts: + # # Avoid stacking processes if one failed before. + # if not e_queue.empty(): + # # No need to do anything, we will abort after the loop + # break + # # Launch commands asynchronously + # self.may_sudo(script) + # stack.enter_context(ProcessContextManager( + # target=self._run_command_chain, + # args=(script, r_queue, e_queue) + # )) + # total_processes += 1 + # + # # Wait for all processes to be over. + # max_waiting_time = 8 * total_processes + # over_processes = 0 + # + # # Max waiting is for deadlocks + # while over_processes < total_processes and max_waiting_time > 0: + # + # result = None + # try: + # # Check the results + # result = r_queue.get(timeout=1) + # except: # An error here can only be a timeout from the queue.get() + # max_waiting_time -= 1 + # + # if result is not None: + # if result == "Command chain ran correctly": + # over_processes += 1 + # logger.debug('%s/%s processes completed', + # over_processes, total_processes) + # elif result.startswith('Command chain ran with '): + # error_str = str(e_queue.get()) + # logger.error('Testbed instantiation failed: %s', error_str) + # raise Exception('Failure: %s' % error_str) + # # ^ This will exit the 'with' block, + # # closing all running processes. + # else: + # logger.error('Unexpected result: %s.', result) + # raise Exception('Unexpected result: %s' % result) + # # ^ This will exit the 'with' block, + # # closing all running processes. + # + # if max_waiting_time == 0: + # logger.error("Swap in is in deadlock, aborting.") + # raise Exception('Swap in is in deadlock') + # # ^ This will exit the 'with' block, + # # closing all running processes. + # + # del r_queue + # del e_queue + # self.multiproc_manager = None + # + # ## + # # End of the 'with' block + # ## logger.info('Interfaces setup complete. ' 'Building VMs (this might take a while).') # Building vms - boot_batch_size = max(1, multiprocessing.cpu_count() // 2) + boot_batch_size = max(1, old_m.cpu_count() // 2) booting_budget = boot_batch_size boot_backoff = 12 # in seconds base_port = 2222 @@ -361,14 +408,16 @@ class Testbed(mod.Testbed): if booting_budget < boot_backoff: tsleep = boot_backoff * (boot_batch_size - booting_budget) / \ boot_batch_size - logger.info('Sleeping %s secs ' + logger.debug('Sleeping %s secs ' 'waiting for the last VMs to boot', tsleep) time.sleep(tsleep) # TODO: to be removed, we should loop in the ssh part - logger.info('Sleeping 5 seconds, just to be on the safe side') - time.sleep(5) + # logger.info('Sleeping 5 seconds, just to be on the safe side') + # time.sleep(5) + + logger.info('All VMs are running. Moving on...') self.recover_if_names(experiment) @@ -389,86 +438,120 @@ class Testbed(mod.Testbed): process.wait() logger.info('Destroying interfaces.') - port_processes = [] - error_queue = multiprocessing.Queue() - results_queue = multiprocessing.Queue() - for vm_name, vm in self.vms.items(): - for port in vm['ports']: - tap = port['tap_id'] - shim = port['shim'] + self.multiproc_manager = multiprocessing.Manager() + results_queue = self.multiproc_manager.Queue() + error_queue = self.multiproc_manager.Queue() + + # See previous ExitStack usage for explanation + with contextlib.ExitStack() as stack: + total_processes = 0 + for vm_name, vm in self.vms.items(): + for port in vm['ports']: + tap = port['tap_id'] + shim = port['shim'] + + commands = [] + commands += ('brctl delif %(br)s %(tap)s\n' + 'ip link set dev %(tap)s down\n' + 'ip tuntap del mode tap name %(tap)s' + % {'tap': tap, 'br': shim.name} + ).split('\n') + self.may_sudo(commands) + stack.enter_context(ProcessContextManager( + target=self._run_command_chain, + args=(commands, results_queue, error_queue), + kwargs={'ignore_errors': True} + )) + total_processes += 1 + + max_waiting_time = 4 * total_processes + over_processes = 0 + + while max_waiting_time > 0 and over_processes < total_processes: + + result = None + try: + # Check for results + result = results_queue.get(timeout=1) + except: + max_waiting_time -= 1 + + if result == "Command chain ran correctly": + over_processes += 1 + elif result.startswith('Command chain ran with '): + error_str = str(error_queue.get()) + logger.warning('Testbed tear-down failed: %s', error_str) + over_processes += 1 + else: + logger.warning('Testbed tear-down failed, ' + 'Unexpected result %s.', result) + over_processes += 1 + + logger.debug('%s/%s tear-down port ' + 'processes completed', + over_processes, total_processes) + ## + # End of the port 'with' block + ## + + del error_queue + del results_queue + self.multiproc_manager = None + + logger.info('Port tear-down complete.') + + self.multiproc_manager = multiprocessing.Manager() + error_queue_2 = self.multiproc_manager.Queue() + results_queue_2 = self.multiproc_manager.Queue() + + # See previous ExitStack usage for explanation + with contextlib.ExitStack() as stack: + total_processes = 0 + for shim in self.shims: commands = [] - commands += ('brctl delif %(br)s %(tap)s\n' - 'ip link set dev %(tap)s down\n' - 'ip tuntap del mode tap name %(tap)s' - % {'tap': tap, 'br': shim.name} + commands += ('ip link set dev %(br)s down\n' + 'brctl delbr %(br)s' + % {'br': shim.name} ).split('\n') self.may_sudo(commands) - process = multiprocessing.Process( + stack.enter_context(ProcessContextManager( target=self._run_command_chain, - args=(commands, results_queue, error_queue), - kwargs={'ignore_errors': True}) - port_processes.append(process) - process.start() - - total_processes = len(port_processes) - max_waiting_time = 2 * total_processes - over_processes = 0 - - while max_waiting_time > 0 and over_processes < total_processes: - # Check for errors - if not error_queue.empty(): - logger.warning('Failure while shutting down: %s', - str(error_queue.get())) - over_processes += 1 - try: - # Check for results - result = results_queue.get(timeout=1) + args=(commands, + results_queue_2, + error_queue_2), + kwargs={'ignore_errors': True} + )) + total_processes += 1 + + max_waiting_time = 4 * total_processes + over_processes = 0 + + while max_waiting_time > 0 and over_processes < total_processes: + + result = None + try: + # Check for results + result = results_queue_2.get(timeout=1) + except: + max_waiting_time -= 1 + if result == "Command chain ran correctly": over_processes += 1 - logger.debug('%s/%s tear-down port ' - 'processes completed', - over_processes, total_processes) - except: - max_waiting_time -= 1 - - error_queue = multiprocessing.Queue() - results_queue = multiprocessing.Queue() - shim_processes = [] - - for shim in self.shims: - commands = [] - commands += ('ip link set dev %(br)s down\n' - 'brctl delbr %(br)s' - % {'br': shim.name} - ).split('\n') - self.may_sudo(commands) - process = multiprocessing.Process(target=self._run_command_chain, - args=(commands, - results_queue, - error_queue), - kwargs={'ignore_errors': True}) - shim_processes.append(process) - process.start() - - total_processes = len(shim_processes) - max_waiting_time = 2 * total_processes - over_processes = 0 - - while max_waiting_time > 0 and over_processes < total_processes: - # Check for errors - if not error_queue.empty(): - logger.warning('Failure while shutting down: %s' - % str(error_queue.get())) - over_processes += 1 - try: - # Check for results - result = results_queue.get(timeout=1) - if result == "Command chain ran correctly": + elif result.startswith('Command chain ran with '): + error_str = str(error_queue_2.get()) + logger.warning('Testbed tear-down failed: %s', error_str) + over_processes += 1 + else: + logger.warning('Testbed tear-down failed, ' + 'Unexpected result %s.', result) over_processes += 1 - logger.debug('%s/%s tear-down shim ' - 'processes completed' - % (over_processes, total_processes)) - except: - max_waiting_time -= 1 + + logger.debug('%s/%s tear-down bridge ' + 'processes completed', + over_processes, total_processes) + ## + # End of the bridge 'with' block + ## + logger.info('Experiment has been swapped out.') |