From 2da15caf24a8b2da70d755e065a5dc3d770c6454 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Tue, 7 Nov 2017 11:40:54 +0100 Subject: prototypes: Add parallel installation for IRATI This adds parallel installation for IRATI, it also adds support for multithread/multiprocess logging. Furthermore prototype-agnostic utilities for multiprocessing have been added. Caching of clients has been re-enabled for the ssh connections. --- rumba/log.py | 142 +++++++++++++++- rumba/model.py | 27 +-- rumba/multiprocessing_utils.py | 106 ++++++++++++ rumba/prototypes/irati.py | 25 ++- rumba/ssh_support.py | 34 +++- rumba/testbeds/jfed.py | 2 + rumba/testbeds/qemu.py | 373 +++++++++++++++++++++++++---------------- setup.py | 2 +- 8 files changed, 531 insertions(+), 180 deletions(-) create mode 100644 rumba/multiprocessing_utils.py diff --git a/rumba/log.py b/rumba/log.py index c7afbd4..6eb2137 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -24,10 +24,9 @@ # import logging - -import sys - +import logging.handlers import multiprocessing +import sys DEBUG = logging.DEBUG @@ -40,6 +39,130 @@ CRITICAL = logging.CRITICAL loggers_set = set() +mq = multiprocessing.Queue() + + +try: + from logging.handlers import QueueHandler +except ImportError: + # We are in python2 code + class QueueHandler(logging.Handler): + """ + This handler sends events to a queue. Typically, it would be used + together with a multiprocessing Queue to centralise logging to file + in one process (in a multi-process application), so as to avoid file + write contention between processes. + + This code is new in Python 3.2, but this class can be copy pasted into + user code for use with earlier Python versions. + """ + + # Copy-pasted as per above docstring from logging + + def __init__(self, queue): + logging.Handler.__init__(self) + self.queue = queue + + def enqueue(self, record): + self.queue.put_nowait(record) + + def prepare(self, record): + self.format(record) + record.msg = record.message + record.args = None + record.exc_info = None + return record + + def emit(self, record): + try: + self.enqueue(self.prepare(record)) + except Exception: + self.handleError(record) + +try: + from logging.handlers import QueueListener +except ImportError: + # We are in python2 code + import threading + try: + import Queue + except ImportError: + # Make it pythonX with 3.0 <= X <3.2 + import queue as Queue + + class QueueListener(object): + """ + This class implements an internal threaded listener which watches for + LogRecords being added to a queue, removes them and passes them to a + list of handlers for processing. + """ + + # Also copy-pasted + _sentinel = None + + def __init__(self, queue, respect_handler_level=False, *handlers): + self.queue = queue + self.handlers = handlers + self._stop = threading.Event() + self._thread = None + self.respect_handler_level = respect_handler_level + + def dequeue(self, block): + return self.queue.get(block) + + def start(self): + self._thread = t = threading.Thread(target=self._monitor) + t.setDaemon(True) + t.start() + + def prepare(self , record): + return record + + def handle(self, record): + record = self.prepare(record) + for handler in self.handlers: + if not self.respect_handler_level: + process = True + else: + process = record.levelno >= handler.level + if process: + handler.handle(record) + + def _monitor(self): + q = self.queue + has_task_done = hasattr(q, 'task_done') + while not self._stop.isSet(): + try: + record = self.dequeue(True) + if record is self._sentinel: + break + self.handle(record) + if has_task_done: + q.task_done() + except Queue.Empty: + pass + # There might still be records in the queue. + while True: + try: + record = self.dequeue(False) + if record is self._sentinel: + break + self.handle(record) + if has_task_done: + q.task_done() + except Queue.Empty: + break + + def enqueue_sentinel(self): + self.queue.put_nowait(self._sentinel) + + def stop(self): + self._stop.set() + self.enqueue_sentinel() + self._thread.join() + self._thread = None + + class RumbaFormatter(logging.Formatter): """The logging.Formatter subclass used by Rumba""" @@ -65,14 +188,19 @@ class RumbaFormatter(logging.Formatter): def setup(): """Configures the logging framework with default values.""" + global mq + queue_handler = QueueHandler(mq) + queue_handler.setLevel(logging.DEBUG) + logging.basicConfig(handlers=[queue_handler], level=logging.DEBUG) + logging.getLogger('').setLevel(logging.ERROR) + logging.getLogger('rumba').setLevel(logging.INFO) + handler = logging.StreamHandler(sys.stdout) - handler.lock = multiprocessing.RLock() handler.setLevel(logging.DEBUG) formatter = RumbaFormatter() handler.setFormatter(formatter) - logging.basicConfig(handlers=[handler], level=logging.DEBUG) - logging.getLogger('').setLevel(logging.ERROR) - logging.getLogger('rumba').setLevel(logging.INFO) + listener = QueueListener(mq, handler) + listener.start() # Used for the first call, in order to configure logging diff --git a/rumba/model.py b/rumba/model.py index 4e2591a..6ba93b0 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -200,6 +200,7 @@ class SSHConfig: self.proxy_server = proxy_server self.client = None self.proxy_client = None + self.http_proxy = None def set_username(self, username): self.username = username @@ -207,6 +208,9 @@ class SSHConfig: def set_password(self, password): self.password = password + def set_http_proxy(self, proxy): + self.http_proxy = proxy + # A node in the experiment # @@ -534,8 +538,8 @@ class Experiment: self.dt_strategy = 'full-mesh' # 'minimal', 'manual' self.dif_ordering = [] self.enrollments = [] # a list of per-DIF lists of enrollments - self.dt_flows = [] # a list of per-DIF lists of data transfer flows - self.mgmt_flows = [] # a list of per-DIF lists of management flows + self.dt_flows = [] # a list of per-DIF lists of data transfer flows + self.mgmt_flows = [] # a list of per-DIF lists of management flows # Generate missing information self.generate() @@ -881,8 +885,8 @@ class ClientProcess(Client): try: self.pid = self.node.execute_command(cmd) except SSHException: - logger.warn('Could not start client %s on node %s.', - self.ap, node.name) + logger.warning('Could not start client %s on node %s.', + self.ap, node.name) logger.debug('Client app %s on node %s got pid %s.', self.ap, self.node.name, self.pid) @@ -894,8 +898,8 @@ class ClientProcess(Client): try: self.node.execute_command("kill %s" % self.pid) except SSHException: - logger.warn('Could not kill client %s on node %s.', - self.ap, self.node.name) + logger.warning('Could not kill client %s on node %s.', + self.ap, self.node.name) def kill_check(self): """Check if the process should keep running, stop it if not, @@ -957,8 +961,7 @@ class Server: """ number = poisson(self.arrival_rate * interval) number = int(min(number, self.max_clients)) - l = [self.make_client_process() for _ in range(number)] - return l + return [self.make_client_process() for _ in range(number)] def make_client_process(self): """Returns a client of this server""" @@ -983,8 +986,8 @@ class Server: try: self.pids[node] = (node.execute_commands(cmds)) except SSHException: - logger.warn('Could not start server %s on node %s.', - self.ap, node.name) + logger.warning('Could not start server %s on node %s.', + self.ap, node.name) def stop(self): for node, pid in self.pids.items(): @@ -995,8 +998,8 @@ class Server: try: node.execute_command("kill %s" % pid) except SSHException: - logger.warn('Could not kill server %s on node %s.', - self.ap, node.name) + logger.warning('Could not kill server %s on node %s.', + self.ap, node.name) # Base class for ARCFIRE storyboards diff --git a/rumba/multiprocessing_utils.py b/rumba/multiprocessing_utils.py new file mode 100644 index 0000000..ce5dd5c --- /dev/null +++ b/rumba/multiprocessing_utils.py @@ -0,0 +1,106 @@ +import multiprocessing.dummy as multiprocessing +import sys + +import rumba.log as log + +if sys.version_info[0] >= 3: + import contextlib +else: + import contextlib2 as contextlib + + +logger = log.get_logger(__name__) + + +def call_in_parallel(name_list, argument_list, executor_list): + """ + Calls each executor in executor_list with the corresponding + argument in argument_list + + Assumes that the three lists are the same length, will fail otherwise. + Is equivalent to + for i, e in enumerate(executor_list): + e(argument_list[i]) + but all calls will be executed in parallel. + + If successful, no output will be given. Otherwise, this will raise + the exception raised by one failed call at random. + + :param name_list: list of names of the executors (used for logging) + :param argument_list: list of arguments to the executors + :param executor_list: list of executors (as functions) + """ + if len(executor_list) != len(name_list) \ + or len(executor_list) != len(argument_list): + raise ValueError("Names, arguments and executors lists " + "must have the same length") + + def job(executor, name, m_queue, argument): + try: + # m_queue.cancel_join_thread() + logger.debug('Starting process "%s".' + % (name,)) + executor(argument) + m_queue.put("DONE") + except BaseException as e: + logger.error('Setup failed. %s: %s', + type(e).__name__, + str(e)) + m_queue.put(e) + + logger.debug('About to start spawning processes.') + queue = multiprocessing.Queue() + with contextlib.ExitStack() as stack: + # This is a composite context manager. + # After exiting the 'with' block, the __exit__ method of all + # processes that have been registered will be called. + msg_to_be_read = 0 + for i, e in enumerate(executor_list): + stack.enter_context(ProcessContextManager( + target=job, + args=(e, name_list[i], queue, argument_list[i]) + )) + msg_to_be_read += 1 + results = [] + for _ in range(len(executor_list)): + result = queue.get() # This blocks until results are available + msg_to_be_read -= 1 + results.append(result) + for result in results: + if result != "DONE": + raise result + # If we get here, we got a success for every node, hence we are done. + + +class ProcessContextManager(object): + + def __init__(self, target, args=None, kwargs=None): + if args is None: + args = () + if kwargs is None: + kwargs = {} + self.process = multiprocessing.Process( + target=target, + args=tuple(args), + kwargs=kwargs + ) + + def __enter__(self): + self.process.start() + return self.process + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_tb is not None or exc_val is not None or exc_tb is not None: + logger.error('Subprocess error: %s.' % (type(exc_val).__name__,)) + try: + self.process.terminate() + self.process.join() + except AttributeError: + # We are using multiprocessing.dummy, so no termination. + # We trust the threads will die with the application + # (since we are shutting down anyway) + pass + return False + else: + self.process.join() + return True diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py index 3a64ede..0f2d69e 100644 --- a/rumba/prototypes/irati.py +++ b/rumba/prototypes/irati.py @@ -22,25 +22,30 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., http://www.fsf.org/about/contact/. # - import copy import json - import os import time import rumba.ssh_support as ssh import rumba.model as mod +import rumba.multiprocessing_utils as m_processing import rumba.prototypes.irati_templates as irati_templates import rumba.log as log - logger = log.get_logger(__name__) # An experiment over the IRATI implementation class Experiment(mod.Experiment): + @staticmethod + def make_executor(node, packages, testbed): + def executor(commands): + ssh.aptitude_install(testbed, node, packages) + node.execute_commands(commands, time_out=None, use_proxy=True) + return executor + def prototype_name(self): return 'irati' @@ -90,11 +95,17 @@ class Experiment(mod.Experiment): "cd ~; git clone -b arcfire https://github.com/IRATI/stack", "cd ~/stack && " + self.sudo("./configure && ") + self.sudo("make install")] - + names = [] + executors = [] + args = [] for node in self.nodes: - ssh.aptitude_install(self.testbed, node, packages) - ssh.execute_proxy_commands(self.testbed, node.ssh_config, - cmds, time_out=None) + + executor = self.make_executor(node, packages, self.testbed) + + names.append(node.name) + executors.append(executor) + args.append(cmds) + m_processing.call_in_parallel(names, args, executors) def bootstrap_network(self): """Creates the network by enrolling and configuring the nodes""" diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py index 60a3767..3103811 100644 --- a/rumba/ssh_support.py +++ b/rumba/ssh_support.py @@ -58,6 +58,7 @@ def _print_stream(stream): return o.rstrip() def ssh_connect(hostname, port, username, password, time_out, proxy_server): + logger.debug('Trying to open a connection towards node %s.' % hostname) retry = 0 max_retries = 10 while retry < max_retries: @@ -180,17 +181,33 @@ def execute_commands(testbed, ssh_config, commands, time_out=3): raise SSHException('Failed to execute command') o = _print_stream(stdout) if chan.recv_exit_status() != 0: + # Get ready for printing stdout and stderr + if o != "": + list_print = ['**** STDOUT:'] + list_print += o.split('\\n') + else: + list_print = [] e = _print_stream(stderr) - o_e = o + (('\n' + e) if e != "" else "") - raise SSHException('A remote command returned an error.' + - (('\n' + o_e) if o_e != "" else "")) + if e != "": + list_print.append('**** STDERR:') + list_print += e.split('\\n') + raise SSHException('A remote command returned an error. ' + 'Output:\n\n\t' + + '\n\t'.join(list_print) + '\n') return o finally: - ssh_config.client.close() - if ssh_config.proxy_client is not None: - ssh_config.proxy_client.close() - ssh_config.client = None - ssh_config.proxy_client = None + ## + # The following lines are a fix to make this work under + # true multiprocessing. Right now we are using + # dummy multiprocessing (i.e. threading), so not needed. + # They have been kept here in case we want to get back to it. + ## + # ssh_config.client.close() + # if ssh_config.proxy_client is not None: + # ssh_config.proxy_client.close() + # ssh_config.client = None + # ssh_config.proxy_client = None + pass def execute_command(testbed, ssh_config, command, time_out=3): """ @@ -329,6 +346,7 @@ def setup_vlans(testbed, node, vlans): execute_commands(testbed, node.ssh_config, cmds) + def aptitude_install(testbed, node, packages): """ Installs packages through aptitude diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index da93110..d6eb458 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -200,6 +200,8 @@ class Testbed(mod.Testbed): "-P", self.password]) def swap_in(self, experiment): + for node in experiment.nodes: + node.ssh_config.set_http_proxy(self.http_proxy) self.create_rspec(experiment) auth_name_r = self.auth_name.replace(".", "-") diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index e4a35ab..75a564c 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -22,21 +22,28 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., http://www.fsf.org/about/contact/. # - -import multiprocessing -import time -import subprocess +import multiprocessing as old_m +import multiprocessing.dummy as multiprocessing import os +import subprocess import sys +import time + +import rumba.model as mod +import rumba.log as log +import rumba.ssh_support as ssh_support +import rumba.multiprocessing_utils as m_processing +from rumba.multiprocessing_utils import ProcessContextManager if sys.version_info[0] >= 3: from urllib.request import urlretrieve else: from urllib import urlretrieve -import rumba.model as mod -import rumba.log as log -import rumba.ssh_support as ssh_support +if sys.version_info[0] >= 3: + import contextlib +else: + import contextlib2 as contextlib logger = log.get_logger(__name__) @@ -52,12 +59,13 @@ class Testbed(mod.Testbed): self.boot_processes = [] self.bzimage_path = bzimage_path self.initramfs_path = initramfs_path + self.multiproc_manager = None # Prepend sudo to all commands if the user is not 'root' def may_sudo(self, cmds): if os.geteuid() != 0: - for i in range(len(cmds)): - cmds[i] = "sudo %s" % cmds[i] + for i, cmd in enumerate(cmds): + cmds[i] = "sudo %s" % cmd @staticmethod def _run_command_chain(commands, results_queue, @@ -77,9 +85,11 @@ class Testbed(mod.Testbed): :return: None """ errors = 0 + # results_queue.cancel_join_thread() + # error_queue.cancel_join_thread() for command in commands: - if not error_queue.empty() and not ignore_errors: - break + # if not error_queue.empty() and not ignore_errors: + # break logger.debug('executing >> %s', command) try: subprocess.check_call(command.split()) @@ -88,12 +98,12 @@ class Testbed(mod.Testbed): errors += 1 if not ignore_errors: break - except KeyboardInterrupt as e: + except KeyboardInterrupt: error_queue.put('Interrupted') if errors == 0: results_queue.put("Command chain ran correctly") else: - results_queue.put("Command chain ran with %d errors" % errors) + results_queue.put("Command chain ran with %d error(s)" % errors) def recover_if_names(self, experiment): for node in experiment.nodes: @@ -161,10 +171,8 @@ class Testbed(mod.Testbed): logger.info('Setting up interfaces.') - # Building bridges and taps - shim_processes = [] - r_queue = multiprocessing.Queue() - e_queue = multiprocessing.Queue() + # Building bridges and taps initialization scripts + br_tab_scripts = [] for shim in experiment.dif_ordering: if not isinstance(shim, mod.ShimEthDIF): # Nothing to do here @@ -201,6 +209,7 @@ class Testbed(mod.Testbed): % {'tap': tap_id, 'speed': speed} ).split('\n') + # While we're at it, build vm ports table and ipcp table vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id}) @@ -216,63 +225,101 @@ class Testbed(mod.Testbed): ipcp.ifname = tap_id # TODO deal with Ip address (shim UDP DIF). - # Avoid stacking processes if one failed before. - if not e_queue.empty(): - break - # Launch commands asynchronously - self.may_sudo(command_list) - process = multiprocessing.Process(target=self._run_command_chain, - args=(command_list, - r_queue, - e_queue)) - shim_processes.append(process) - process.start() - - # Wait for all processes to be over. - total_processes = len(shim_processes) - max_waiting_time = 4 * total_processes - over_processes = 0 - - while over_processes < total_processes and max_waiting_time > 0: - # Check for deadlock - - # Check for errors - if not e_queue.empty(): - error_str = str(e_queue.get()) - logger.error('Testbed instantiation failed: %s', error_str) - - # Wait for the running processes to quit before swapping out - for process in shim_processes: - process.join() - - raise Exception('Failure: %s' % error_str) - try: - # Check for results - result = r_queue.get(timeout=1) - if result == "Command chain ran correctly": - over_processes += 1 - logger.debug('%s/%s processes completed', - over_processes, total_processes) - except: - max_waiting_time -= 1 - - if max_waiting_time == 0: - logger.error("Swap in is in deadlock, aborting.") - for process in shim_processes: - process.terminate() - time.sleep(0.1) - process.join() - raise Exception('Swap in is in deadlock') - - for process in shim_processes: - process.join() + br_tab_scripts.append(command_list) + ## + # End of shim/node parsing block + ## + # + + def executor(list_of_commands): + for cmd in list_of_commands: + logger.debug('executing >> %s', cmd) + subprocess.check_call(cmd.split()) + + names = [] + args = [] + executors = [] + for i, script in enumerate(br_tab_scripts): + names.append(i) + self.may_sudo(script) + args.append(script) + executors.append(executor) + + m_processing.call_in_parallel(names, args, executors) + + # self.multiproc_manager = multiprocessing.Manager() + # + # r_queue = self.multiproc_manager.Queue() + # e_queue = self.multiproc_manager.Queue() + # + # # This is a multi-context manager. It will close all processes + # # as soon as the interpreter exits the 'with' block + # with contextlib.ExitStack() as stack: + # total_processes = 0 + # for script in br_tab_scripts: + # # Avoid stacking processes if one failed before. + # if not e_queue.empty(): + # # No need to do anything, we will abort after the loop + # break + # # Launch commands asynchronously + # self.may_sudo(script) + # stack.enter_context(ProcessContextManager( + # target=self._run_command_chain, + # args=(script, r_queue, e_queue) + # )) + # total_processes += 1 + # + # # Wait for all processes to be over. + # max_waiting_time = 8 * total_processes + # over_processes = 0 + # + # # Max waiting is for deadlocks + # while over_processes < total_processes and max_waiting_time > 0: + # + # result = None + # try: + # # Check the results + # result = r_queue.get(timeout=1) + # except: # An error here can only be a timeout from the queue.get() + # max_waiting_time -= 1 + # + # if result is not None: + # if result == "Command chain ran correctly": + # over_processes += 1 + # logger.debug('%s/%s processes completed', + # over_processes, total_processes) + # elif result.startswith('Command chain ran with '): + # error_str = str(e_queue.get()) + # logger.error('Testbed instantiation failed: %s', error_str) + # raise Exception('Failure: %s' % error_str) + # # ^ This will exit the 'with' block, + # # closing all running processes. + # else: + # logger.error('Unexpected result: %s.', result) + # raise Exception('Unexpected result: %s' % result) + # # ^ This will exit the 'with' block, + # # closing all running processes. + # + # if max_waiting_time == 0: + # logger.error("Swap in is in deadlock, aborting.") + # raise Exception('Swap in is in deadlock') + # # ^ This will exit the 'with' block, + # # closing all running processes. + # + # del r_queue + # del e_queue + # self.multiproc_manager = None + # + # ## + # # End of the 'with' block + # ## logger.info('Interfaces setup complete. ' 'Building VMs (this might take a while).') # Building vms - boot_batch_size = max(1, multiprocessing.cpu_count() // 2) + boot_batch_size = max(1, old_m.cpu_count() // 2) booting_budget = boot_batch_size boot_backoff = 12 # in seconds base_port = 2222 @@ -361,14 +408,16 @@ class Testbed(mod.Testbed): if booting_budget < boot_backoff: tsleep = boot_backoff * (boot_batch_size - booting_budget) / \ boot_batch_size - logger.info('Sleeping %s secs ' + logger.debug('Sleeping %s secs ' 'waiting for the last VMs to boot', tsleep) time.sleep(tsleep) # TODO: to be removed, we should loop in the ssh part - logger.info('Sleeping 5 seconds, just to be on the safe side') - time.sleep(5) + # logger.info('Sleeping 5 seconds, just to be on the safe side') + # time.sleep(5) + + logger.info('All VMs are running. Moving on...') self.recover_if_names(experiment) @@ -389,86 +438,120 @@ class Testbed(mod.Testbed): process.wait() logger.info('Destroying interfaces.') - port_processes = [] - error_queue = multiprocessing.Queue() - results_queue = multiprocessing.Queue() - for vm_name, vm in self.vms.items(): - for port in vm['ports']: - tap = port['tap_id'] - shim = port['shim'] + self.multiproc_manager = multiprocessing.Manager() + results_queue = self.multiproc_manager.Queue() + error_queue = self.multiproc_manager.Queue() + + # See previous ExitStack usage for explanation + with contextlib.ExitStack() as stack: + total_processes = 0 + for vm_name, vm in self.vms.items(): + for port in vm['ports']: + tap = port['tap_id'] + shim = port['shim'] + + commands = [] + commands += ('brctl delif %(br)s %(tap)s\n' + 'ip link set dev %(tap)s down\n' + 'ip tuntap del mode tap name %(tap)s' + % {'tap': tap, 'br': shim.name} + ).split('\n') + self.may_sudo(commands) + stack.enter_context(ProcessContextManager( + target=self._run_command_chain, + args=(commands, results_queue, error_queue), + kwargs={'ignore_errors': True} + )) + total_processes += 1 + + max_waiting_time = 4 * total_processes + over_processes = 0 + + while max_waiting_time > 0 and over_processes < total_processes: + + result = None + try: + # Check for results + result = results_queue.get(timeout=1) + except: + max_waiting_time -= 1 + + if result == "Command chain ran correctly": + over_processes += 1 + elif result.startswith('Command chain ran with '): + error_str = str(error_queue.get()) + logger.warning('Testbed tear-down failed: %s', error_str) + over_processes += 1 + else: + logger.warning('Testbed tear-down failed, ' + 'Unexpected result %s.', result) + over_processes += 1 + + logger.debug('%s/%s tear-down port ' + 'processes completed', + over_processes, total_processes) + ## + # End of the port 'with' block + ## + + del error_queue + del results_queue + self.multiproc_manager = None + + logger.info('Port tear-down complete.') + + self.multiproc_manager = multiprocessing.Manager() + error_queue_2 = self.multiproc_manager.Queue() + results_queue_2 = self.multiproc_manager.Queue() + + # See previous ExitStack usage for explanation + with contextlib.ExitStack() as stack: + total_processes = 0 + for shim in self.shims: commands = [] - commands += ('brctl delif %(br)s %(tap)s\n' - 'ip link set dev %(tap)s down\n' - 'ip tuntap del mode tap name %(tap)s' - % {'tap': tap, 'br': shim.name} + commands += ('ip link set dev %(br)s down\n' + 'brctl delbr %(br)s' + % {'br': shim.name} ).split('\n') self.may_sudo(commands) - process = multiprocessing.Process( + stack.enter_context(ProcessContextManager( target=self._run_command_chain, - args=(commands, results_queue, error_queue), - kwargs={'ignore_errors': True}) - port_processes.append(process) - process.start() - - total_processes = len(port_processes) - max_waiting_time = 2 * total_processes - over_processes = 0 - - while max_waiting_time > 0 and over_processes < total_processes: - # Check for errors - if not error_queue.empty(): - logger.warning('Failure while shutting down: %s', - str(error_queue.get())) - over_processes += 1 - try: - # Check for results - result = results_queue.get(timeout=1) + args=(commands, + results_queue_2, + error_queue_2), + kwargs={'ignore_errors': True} + )) + total_processes += 1 + + max_waiting_time = 4 * total_processes + over_processes = 0 + + while max_waiting_time > 0 and over_processes < total_processes: + + result = None + try: + # Check for results + result = results_queue_2.get(timeout=1) + except: + max_waiting_time -= 1 + if result == "Command chain ran correctly": over_processes += 1 - logger.debug('%s/%s tear-down port ' - 'processes completed', - over_processes, total_processes) - except: - max_waiting_time -= 1 - - error_queue = multiprocessing.Queue() - results_queue = multiprocessing.Queue() - shim_processes = [] - - for shim in self.shims: - commands = [] - commands += ('ip link set dev %(br)s down\n' - 'brctl delbr %(br)s' - % {'br': shim.name} - ).split('\n') - self.may_sudo(commands) - process = multiprocessing.Process(target=self._run_command_chain, - args=(commands, - results_queue, - error_queue), - kwargs={'ignore_errors': True}) - shim_processes.append(process) - process.start() - - total_processes = len(shim_processes) - max_waiting_time = 2 * total_processes - over_processes = 0 - - while max_waiting_time > 0 and over_processes < total_processes: - # Check for errors - if not error_queue.empty(): - logger.warning('Failure while shutting down: %s' - % str(error_queue.get())) - over_processes += 1 - try: - # Check for results - result = results_queue.get(timeout=1) - if result == "Command chain ran correctly": + elif result.startswith('Command chain ran with '): + error_str = str(error_queue_2.get()) + logger.warning('Testbed tear-down failed: %s', error_str) + over_processes += 1 + else: + logger.warning('Testbed tear-down failed, ' + 'Unexpected result %s.', result) over_processes += 1 - logger.debug('%s/%s tear-down shim ' - 'processes completed' - % (over_processes, total_processes)) - except: - max_waiting_time -= 1 + + logger.debug('%s/%s tear-down bridge ' + 'processes completed', + over_processes, total_processes) + ## + # End of the bridge 'with' block + ## + logger.info('Experiment has been swapped out.') diff --git a/setup.py b/setup.py index 8fe1e93..ab10913 100755 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setuptools.setup( license='LGPL', description='Rumba measurement framework for RINA', packages=['rumba', 'rumba.testbeds', 'rumba.prototypes'], - install_requires=['paramiko', 'repoze.lru; python_version<"3.2"'], + install_requires=['paramiko', 'repoze.lru; python_version<"3.2"', 'contextlib2; python_version<"3.0"'], extras_require={'NumpyAcceleration': ['numpy']}, scripts=['tools/rumba-access'] ) -- cgit v1.2.3