From c884a5b8c1e2a2f4d610cae7b9aa547b95424210 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Thu, 16 Nov 2017 14:31:58 +0100 Subject: testbed-qemu: refactor swapout to use call_in_parallel minor: also cleaned up some commented old code --- rumba/testbeds/qemu.py | 233 +++++++++++-------------------------------------- 1 file changed, 49 insertions(+), 184 deletions(-) (limited to 'rumba/testbeds') diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index 530b4ac..a74fae0 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -24,27 +24,22 @@ # Foundation, Inc., http://www.fsf.org/about/contact/. # import multiprocessing as old_m -import multiprocessing.dummy as multiprocessing import os import subprocess import sys import time +from subprocess import CalledProcessError import rumba.model as mod import rumba.log as log import rumba.ssh_support as ssh_support import rumba.multiprocess as m_processing -from rumba.multiprocess import ProcessContextManager if sys.version_info[0] >= 3: from urllib.request import urlretrieve else: from urllib import urlretrieve -if sys.version_info[0] >= 3: - import contextlib -else: - import contextlib2 as contextlib logger = log.get_logger(__name__) @@ -248,73 +243,6 @@ class Testbed(mod.Testbed): m_processing.call_in_parallel(names, args, executors) - # self.multiproc_manager = multiprocessing.Manager() - # - # r_queue = self.multiproc_manager.Queue() - # e_queue = self.multiproc_manager.Queue() - # - # # This is a multi-context manager. It will close all processes - # # as soon as the interpreter exits the 'with' block - # with contextlib.ExitStack() as stack: - # total_processes = 0 - # for script in br_tab_scripts: - # # Avoid stacking processes if one failed before. - # if not e_queue.empty(): - # # No need to do anything, we will abort after the loop - # break - # # Launch commands asynchronously - # self.may_sudo(script) - # stack.enter_context(ProcessContextManager( - # target=self._run_command_chain, - # args=(script, r_queue, e_queue) - # )) - # total_processes += 1 - # - # # Wait for all processes to be over. - # max_waiting_time = 8 * total_processes - # over_processes = 0 - # - # # Max waiting is for deadlocks - # while over_processes < total_processes and max_waiting_time > 0: - # - # result = None - # try: - # # Check the results - # result = r_queue.get(timeout=1) - # except: # An error here can only be a timeout from the queue.get() - # max_waiting_time -= 1 - # - # if result is not None: - # if result == "Command chain ran correctly": - # over_processes += 1 - # logger.debug('%s/%s processes completed', - # over_processes, total_processes) - # elif result.startswith('Command chain ran with '): - # error_str = str(e_queue.get()) - # logger.error('Testbed instantiation failed: %s', error_str) - # raise Exception('Failure: %s' % error_str) - # # ^ This will exit the 'with' block, - # # closing all running processes. - # else: - # logger.error('Unexpected result: %s.', result) - # raise Exception('Unexpected result: %s' % result) - # # ^ This will exit the 'with' block, - # # closing all running processes. - # - # if max_waiting_time == 0: - # logger.error("Swap in is in deadlock, aborting.") - # raise Exception('Swap in is in deadlock') - # # ^ This will exit the 'with' block, - # # closing all running processes. - # - # del r_queue - # del e_queue - # self.multiproc_manager = None - # - # ## - # # End of the 'with' block - # ## - logger.info('Interfaces setup complete. ' 'Building VMs (this might take a while).') @@ -406,18 +334,14 @@ class Testbed(mod.Testbed): vmid += 1 # Wait for the last batch of VMs to start - if booting_budget < boot_backoff: + if booting_budget < boot_batch_size: tsleep = boot_backoff * (boot_batch_size - booting_budget) / \ boot_batch_size logger.debug('Sleeping %s secs ' - 'waiting for the last VMs to boot', - tsleep) + 'waiting for the last VMs to boot', + tsleep) time.sleep(tsleep) - # TODO: to be removed, we should loop in the ssh part - # logger.info('Sleeping 5 seconds, just to be on the safe side') - # time.sleep(5) - logger.info('All VMs are running. Moving on...') self.recover_if_names(experiment) @@ -440,119 +364,60 @@ class Testbed(mod.Testbed): logger.info('Destroying interfaces.') - self.multiproc_manager = multiprocessing.Manager() - results_queue = self.multiproc_manager.Queue() - error_queue = self.multiproc_manager.Queue() - - # See previous ExitStack usage for explanation - with contextlib.ExitStack() as stack: - total_processes = 0 - for vm_name, vm in self.vms.items(): - for port in vm['ports']: - tap = port['tap_id'] - shim = port['shim'] - - commands = [] - commands += ('brctl delif %(br)s %(tap)s\n' - 'ip link set dev %(tap)s down\n' - 'ip tuntap del mode tap name %(tap)s' - % {'tap': tap, 'br': shim.name} - ).split('\n') - self.may_sudo(commands) - stack.enter_context(ProcessContextManager( - target=self._run_command_chain, - args=(commands, results_queue, error_queue), - kwargs={'ignore_errors': True} - )) - total_processes += 1 - - max_waiting_time = 4 * total_processes - over_processes = 0 - - while max_waiting_time > 0 and over_processes < total_processes: + names = [] + args = [] + executors = [] - result = None + def executor(list_of_commands): + for cmd in list_of_commands: + logger.debug('executing >> %s', cmd) try: - # Check for results - result = results_queue.get(timeout=1) - except: - max_waiting_time -= 1 - - if result == "Command chain ran correctly": - over_processes += 1 - elif result.startswith('Command chain ran with '): - error_str = str(error_queue.get()) - logger.warning('Testbed tear-down failed: %s', error_str) - over_processes += 1 - else: - logger.warning('Testbed tear-down failed, ' - 'Unexpected result %s.', result) - over_processes += 1 - - logger.debug('%s/%s tear-down port ' - 'processes completed', - over_processes, total_processes) - ## - # End of the port 'with' block - ## - - del error_queue - del results_queue - self.multiproc_manager = None - - logger.info('Port tear-down complete.') + subprocess.check_call(cmd.split()) + except CalledProcessError as e: + logger.warning('Error during cleanup: %s', str(e)) - self.multiproc_manager = multiprocessing.Manager() - error_queue_2 = self.multiproc_manager.Queue() - results_queue_2 = self.multiproc_manager.Queue() + index = 0 + for vm_name, vm in self.vms.items(): + for port in vm['ports']: + tap = port['tap_id'] + shim = port['shim'] - # See previous ExitStack usage for explanation - with contextlib.ExitStack() as stack: - total_processes = 0 - for shim in self.shims: commands = [] - commands += ('ip link set dev %(br)s down\n' - 'brctl delbr %(br)s' - % {'br': shim.name} + commands += ('brctl delif %(br)s %(tap)s\n' + 'ip link set dev %(tap)s down\n' + 'ip tuntap del mode tap name %(tap)s' + % {'tap': tap, 'br': shim.name} ).split('\n') self.may_sudo(commands) - stack.enter_context(ProcessContextManager( - target=self._run_command_chain, - args=(commands, - results_queue_2, - error_queue_2), - kwargs={'ignore_errors': True} - )) - total_processes += 1 - max_waiting_time = 4 * total_processes - over_processes = 0 + names.append(index) + index += 1 + args.append(commands) + executors.append(executor) - while max_waiting_time > 0 and over_processes < total_processes: + m_processing.call_in_parallel(names, args, executors) - result = None - try: - # Check for results - result = results_queue_2.get(timeout=1) - except: - max_waiting_time -= 1 - - if result == "Command chain ran correctly": - over_processes += 1 - elif result.startswith('Command chain ran with '): - error_str = str(error_queue_2.get()) - logger.warning('Testbed tear-down failed: %s', error_str) - over_processes += 1 - else: - logger.warning('Testbed tear-down failed, ' - 'Unexpected result %s.', result) - over_processes += 1 - - logger.debug('%s/%s tear-down bridge ' - 'processes completed', - over_processes, total_processes) - ## - # End of the bridge 'with' block - ## + logger.info('Port tear-down complete. Destroying bridges.') + + names = [] + args = [] + executors = [] + + index = 0 + + for shim in self.shims: + commands = [] + commands += ('ip link set dev %(br)s down\n' + 'brctl delbr %(br)s' + % {'br': shim.name} + ).split('\n') + self.may_sudo(commands) + + names.append(index) + index += 1 + args.append(commands) + executors.append(executor) + + m_processing.call_in_parallel(names, args, executors) logger.info('Experiment has been swapped out.') -- cgit v1.2.3