From c884a5b8c1e2a2f4d610cae7b9aa547b95424210 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Thu, 16 Nov 2017 14:31:58 +0100 Subject: testbed-qemu: refactor swapout to use call_in_parallel minor: also cleaned up some commented old code --- rumba/prototypes/irati.py | 6 +- rumba/ssh_support.py | 74 ++++++--------- rumba/testbeds/qemu.py | 233 ++++++++++------------------------------------ 3 files changed, 80 insertions(+), 233 deletions(-) diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py index c9faf36..82e56b9 100644 --- a/rumba/prototypes/irati.py +++ b/rumba/prototypes/irati.py @@ -194,7 +194,7 @@ class Experiment(mod.Experiment): def enroll_nodes(self): """Runs the enrollments one by one, respecting dependencies""" - logger.info("Waiting 5 seconds for the ipcm to start.") + logger.info("Starting enrollment phase.") time.sleep(5) for enrollment_list in self.enrollments: for e in enrollment_list: @@ -268,10 +268,6 @@ class Experiment(mod.Experiment): "difName": "%s" % (apm['dif']) }) - # TODO ask: I guess this will need to be added, - # and in that case we should add it to the qemu plugin too... - # Where should we take it in input? - if self.manager: # Add MAD/Manager configuration irati_templates.get_ipcmconf_base()["addons"] = { diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py index 9990fc9..39dbf5f 100644 --- a/rumba/ssh_support.py +++ b/rumba/ssh_support.py @@ -165,50 +165,36 @@ def execute_commands(testbed, ssh_config, commands, time_out=3): ssh_config.proxy_client = proxy_client o = "" - try: - for command in commands: - logger.debug("%s@%s:%s >> %s" % (testbed.username, - ssh_config.hostname, - ssh_config.port, - command)) - envars = '. /etc/profile;' - command = envars + ' ' + command - chan = ssh_config.client.get_transport().open_session() - stdout = chan.makefile() - stderr = chan.makefile_stderr() - try: - chan.exec_command(command) - except paramiko.ssh_exception.SSHException as e: - raise SSHException('Failed to execute command') - o = _print_stream(stdout) - if chan.recv_exit_status() != 0: - # Get ready for printing stdout and stderr - if o != "": - list_print = ['**** STDOUT:'] - list_print += o.split('\\n') - else: - list_print = [] - e = _print_stream(stderr) - if e != "": - list_print.append('**** STDERR:') - list_print += e.split('\\n') - raise SSHException('A remote command returned an error. ' - 'Output:\n\n\t' + - '\n\t'.join(list_print) + '\n') - return o - finally: - ## - # The following lines are a fix to make this work under - # true multiprocessing. Right now we are using - # dummy multiprocessing (i.e. threading), so not needed. - # They have been kept here in case we want to get back to it. - ## - # ssh_config.client.close() - # if ssh_config.proxy_client is not None: - # ssh_config.proxy_client.close() - # ssh_config.client = None - # ssh_config.proxy_client = None - pass + for command in commands: + logger.debug("%s@%s:%s >> %s" % (testbed.username, + ssh_config.hostname, + ssh_config.port, + command)) + envars = '. /etc/profile;' + command = envars + ' ' + command + chan = ssh_config.client.get_transport().open_session() + stdout = chan.makefile() + stderr = chan.makefile_stderr() + try: + chan.exec_command(command) + except paramiko.ssh_exception.SSHException as e: + raise SSHException('Failed to execute command') + o = _print_stream(stdout) + if chan.recv_exit_status() != 0: + # Get ready for printing stdout and stderr + if o != "": + list_print = ['**** STDOUT:'] + list_print += o.split('\\n') + else: + list_print = [] + e = _print_stream(stderr) + if e != "": + list_print.append('**** STDERR:') + list_print += e.split('\\n') + raise SSHException('A remote command returned an error. ' + 'Output:\n\n\t' + + '\n\t'.join(list_print) + '\n') + return o def execute_command(testbed, ssh_config, command, time_out=3): """ diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index 530b4ac..a74fae0 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -24,27 +24,22 @@ # Foundation, Inc., http://www.fsf.org/about/contact/. # import multiprocessing as old_m -import multiprocessing.dummy as multiprocessing import os import subprocess import sys import time +from subprocess import CalledProcessError import rumba.model as mod import rumba.log as log import rumba.ssh_support as ssh_support import rumba.multiprocess as m_processing -from rumba.multiprocess import ProcessContextManager if sys.version_info[0] >= 3: from urllib.request import urlretrieve else: from urllib import urlretrieve -if sys.version_info[0] >= 3: - import contextlib -else: - import contextlib2 as contextlib logger = log.get_logger(__name__) @@ -248,73 +243,6 @@ class Testbed(mod.Testbed): m_processing.call_in_parallel(names, args, executors) - # self.multiproc_manager = multiprocessing.Manager() - # - # r_queue = self.multiproc_manager.Queue() - # e_queue = self.multiproc_manager.Queue() - # - # # This is a multi-context manager. It will close all processes - # # as soon as the interpreter exits the 'with' block - # with contextlib.ExitStack() as stack: - # total_processes = 0 - # for script in br_tab_scripts: - # # Avoid stacking processes if one failed before. - # if not e_queue.empty(): - # # No need to do anything, we will abort after the loop - # break - # # Launch commands asynchronously - # self.may_sudo(script) - # stack.enter_context(ProcessContextManager( - # target=self._run_command_chain, - # args=(script, r_queue, e_queue) - # )) - # total_processes += 1 - # - # # Wait for all processes to be over. - # max_waiting_time = 8 * total_processes - # over_processes = 0 - # - # # Max waiting is for deadlocks - # while over_processes < total_processes and max_waiting_time > 0: - # - # result = None - # try: - # # Check the results - # result = r_queue.get(timeout=1) - # except: # An error here can only be a timeout from the queue.get() - # max_waiting_time -= 1 - # - # if result is not None: - # if result == "Command chain ran correctly": - # over_processes += 1 - # logger.debug('%s/%s processes completed', - # over_processes, total_processes) - # elif result.startswith('Command chain ran with '): - # error_str = str(e_queue.get()) - # logger.error('Testbed instantiation failed: %s', error_str) - # raise Exception('Failure: %s' % error_str) - # # ^ This will exit the 'with' block, - # # closing all running processes. - # else: - # logger.error('Unexpected result: %s.', result) - # raise Exception('Unexpected result: %s' % result) - # # ^ This will exit the 'with' block, - # # closing all running processes. - # - # if max_waiting_time == 0: - # logger.error("Swap in is in deadlock, aborting.") - # raise Exception('Swap in is in deadlock') - # # ^ This will exit the 'with' block, - # # closing all running processes. - # - # del r_queue - # del e_queue - # self.multiproc_manager = None - # - # ## - # # End of the 'with' block - # ## - logger.info('Interfaces setup complete. ' 'Building VMs (this might take a while).') @@ -406,18 +334,14 @@ class Testbed(mod.Testbed): vmid += 1 # Wait for the last batch of VMs to start - if booting_budget < boot_backoff: + if booting_budget < boot_batch_size: tsleep = boot_backoff * (boot_batch_size - booting_budget) / \ boot_batch_size logger.debug('Sleeping %s secs ' - 'waiting for the last VMs to boot', - tsleep) + 'waiting for the last VMs to boot', + tsleep) time.sleep(tsleep) - # TODO: to be removed, we should loop in the ssh part - # logger.info('Sleeping 5 seconds, just to be on the safe side') - # time.sleep(5) - logger.info('All VMs are running. Moving on...') self.recover_if_names(experiment) @@ -440,119 +364,60 @@ class Testbed(mod.Testbed): logger.info('Destroying interfaces.') - self.multiproc_manager = multiprocessing.Manager() - results_queue = self.multiproc_manager.Queue() - error_queue = self.multiproc_manager.Queue() - - # See previous ExitStack usage for explanation - with contextlib.ExitStack() as stack: - total_processes = 0 - for vm_name, vm in self.vms.items(): - for port in vm['ports']: - tap = port['tap_id'] - shim = port['shim'] - - commands = [] - commands += ('brctl delif %(br)s %(tap)s\n' - 'ip link set dev %(tap)s down\n' - 'ip tuntap del mode tap name %(tap)s' - % {'tap': tap, 'br': shim.name} - ).split('\n') - self.may_sudo(commands) - stack.enter_context(ProcessContextManager( - target=self._run_command_chain, - args=(commands, results_queue, error_queue), - kwargs={'ignore_errors': True} - )) - total_processes += 1 - - max_waiting_time = 4 * total_processes - over_processes = 0 - - while max_waiting_time > 0 and over_processes < total_processes: + names = [] + args = [] + executors = [] - result = None + def executor(list_of_commands): + for cmd in list_of_commands: + logger.debug('executing >> %s', cmd) try: - # Check for results - result = results_queue.get(timeout=1) - except: - max_waiting_time -= 1 - - if result == "Command chain ran correctly": - over_processes += 1 - elif result.startswith('Command chain ran with '): - error_str = str(error_queue.get()) - logger.warning('Testbed tear-down failed: %s', error_str) - over_processes += 1 - else: - logger.warning('Testbed tear-down failed, ' - 'Unexpected result %s.', result) - over_processes += 1 - - logger.debug('%s/%s tear-down port ' - 'processes completed', - over_processes, total_processes) - ## - # End of the port 'with' block - ## - - del error_queue - del results_queue - self.multiproc_manager = None - - logger.info('Port tear-down complete.') + subprocess.check_call(cmd.split()) + except CalledProcessError as e: + logger.warning('Error during cleanup: %s', str(e)) - self.multiproc_manager = multiprocessing.Manager() - error_queue_2 = self.multiproc_manager.Queue() - results_queue_2 = self.multiproc_manager.Queue() + index = 0 + for vm_name, vm in self.vms.items(): + for port in vm['ports']: + tap = port['tap_id'] + shim = port['shim'] - # See previous ExitStack usage for explanation - with contextlib.ExitStack() as stack: - total_processes = 0 - for shim in self.shims: commands = [] - commands += ('ip link set dev %(br)s down\n' - 'brctl delbr %(br)s' - % {'br': shim.name} + commands += ('brctl delif %(br)s %(tap)s\n' + 'ip link set dev %(tap)s down\n' + 'ip tuntap del mode tap name %(tap)s' + % {'tap': tap, 'br': shim.name} ).split('\n') self.may_sudo(commands) - stack.enter_context(ProcessContextManager( - target=self._run_command_chain, - args=(commands, - results_queue_2, - error_queue_2), - kwargs={'ignore_errors': True} - )) - total_processes += 1 - max_waiting_time = 4 * total_processes - over_processes = 0 + names.append(index) + index += 1 + args.append(commands) + executors.append(executor) - while max_waiting_time > 0 and over_processes < total_processes: + m_processing.call_in_parallel(names, args, executors) - result = None - try: - # Check for results - result = results_queue_2.get(timeout=1) - except: - max_waiting_time -= 1 - - if result == "Command chain ran correctly": - over_processes += 1 - elif result.startswith('Command chain ran with '): - error_str = str(error_queue_2.get()) - logger.warning('Testbed tear-down failed: %s', error_str) - over_processes += 1 - else: - logger.warning('Testbed tear-down failed, ' - 'Unexpected result %s.', result) - over_processes += 1 - - logger.debug('%s/%s tear-down bridge ' - 'processes completed', - over_processes, total_processes) - ## - # End of the bridge 'with' block - ## + logger.info('Port tear-down complete. Destroying bridges.') + + names = [] + args = [] + executors = [] + + index = 0 + + for shim in self.shims: + commands = [] + commands += ('ip link set dev %(br)s down\n' + 'brctl delbr %(br)s' + % {'br': shim.name} + ).split('\n') + self.may_sudo(commands) + + names.append(index) + index += 1 + args.append(commands) + executors.append(executor) + + m_processing.call_in_parallel(names, args, executors) logger.info('Experiment has been swapped out.') -- cgit v1.2.3