aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-11-07 11:40:54 +0100
committerSander Vrijders <sander.vrijders@ugent.be>2017-11-15 18:59:11 +0100
commit2da15caf24a8b2da70d755e065a5dc3d770c6454 (patch)
tree292456db4acc4ab8f4afffad50113cc7e219b6cd
parent2e91ca33f90f7c74887013e08c95bb00cdd4fc00 (diff)
downloadrumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.tar.gz
rumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.zip
prototypes: Add parallel installation for IRATI
This adds parallel installation for IRATI, it also adds support for multithread/multiprocess logging. Furthermore prototype-agnostic utilities for multiprocessing have been added. Caching of clients has been re-enabled for the ssh connections.
-rw-r--r--rumba/log.py142
-rw-r--r--rumba/model.py27
-rw-r--r--rumba/multiprocessing_utils.py106
-rw-r--r--rumba/prototypes/irati.py25
-rw-r--r--rumba/ssh_support.py34
-rw-r--r--rumba/testbeds/jfed.py2
-rw-r--r--rumba/testbeds/qemu.py373
-rwxr-xr-xsetup.py2
8 files changed, 531 insertions, 180 deletions
diff --git a/rumba/log.py b/rumba/log.py
index c7afbd4..6eb2137 100644
--- a/rumba/log.py
+++ b/rumba/log.py
@@ -24,10 +24,9 @@
#
import logging
-
-import sys
-
+import logging.handlers
import multiprocessing
+import sys
DEBUG = logging.DEBUG
@@ -40,6 +39,130 @@ CRITICAL = logging.CRITICAL
loggers_set = set()
+mq = multiprocessing.Queue()
+
+
+try:
+ from logging.handlers import QueueHandler
+except ImportError:
+ # We are in python2 code
+ class QueueHandler(logging.Handler):
+ """
+ This handler sends events to a queue. Typically, it would be used
+ together with a multiprocessing Queue to centralise logging to file
+ in one process (in a multi-process application), so as to avoid file
+ write contention between processes.
+
+ This code is new in Python 3.2, but this class can be copy pasted into
+ user code for use with earlier Python versions.
+ """
+
+ # Copy-pasted as per above docstring from logging
+
+ def __init__(self, queue):
+ logging.Handler.__init__(self)
+ self.queue = queue
+
+ def enqueue(self, record):
+ self.queue.put_nowait(record)
+
+ def prepare(self, record):
+ self.format(record)
+ record.msg = record.message
+ record.args = None
+ record.exc_info = None
+ return record
+
+ def emit(self, record):
+ try:
+ self.enqueue(self.prepare(record))
+ except Exception:
+ self.handleError(record)
+
+try:
+ from logging.handlers import QueueListener
+except ImportError:
+ # We are in python2 code
+ import threading
+ try:
+ import Queue
+ except ImportError:
+ # Make it pythonX with 3.0 <= X <3.2
+ import queue as Queue
+
+ class QueueListener(object):
+ """
+ This class implements an internal threaded listener which watches for
+ LogRecords being added to a queue, removes them and passes them to a
+ list of handlers for processing.
+ """
+
+ # Also copy-pasted
+ _sentinel = None
+
+ def __init__(self, queue, respect_handler_level=False, *handlers):
+ self.queue = queue
+ self.handlers = handlers
+ self._stop = threading.Event()
+ self._thread = None
+ self.respect_handler_level = respect_handler_level
+
+ def dequeue(self, block):
+ return self.queue.get(block)
+
+ def start(self):
+ self._thread = t = threading.Thread(target=self._monitor)
+ t.setDaemon(True)
+ t.start()
+
+ def prepare(self , record):
+ return record
+
+ def handle(self, record):
+ record = self.prepare(record)
+ for handler in self.handlers:
+ if not self.respect_handler_level:
+ process = True
+ else:
+ process = record.levelno >= handler.level
+ if process:
+ handler.handle(record)
+
+ def _monitor(self):
+ q = self.queue
+ has_task_done = hasattr(q, 'task_done')
+ while not self._stop.isSet():
+ try:
+ record = self.dequeue(True)
+ if record is self._sentinel:
+ break
+ self.handle(record)
+ if has_task_done:
+ q.task_done()
+ except Queue.Empty:
+ pass
+ # There might still be records in the queue.
+ while True:
+ try:
+ record = self.dequeue(False)
+ if record is self._sentinel:
+ break
+ self.handle(record)
+ if has_task_done:
+ q.task_done()
+ except Queue.Empty:
+ break
+
+ def enqueue_sentinel(self):
+ self.queue.put_nowait(self._sentinel)
+
+ def stop(self):
+ self._stop.set()
+ self.enqueue_sentinel()
+ self._thread.join()
+ self._thread = None
+
+
class RumbaFormatter(logging.Formatter):
"""The logging.Formatter subclass used by Rumba"""
@@ -65,14 +188,19 @@ class RumbaFormatter(logging.Formatter):
def setup():
"""Configures the logging framework with default values."""
+ global mq
+ queue_handler = QueueHandler(mq)
+ queue_handler.setLevel(logging.DEBUG)
+ logging.basicConfig(handlers=[queue_handler], level=logging.DEBUG)
+ logging.getLogger('').setLevel(logging.ERROR)
+ logging.getLogger('rumba').setLevel(logging.INFO)
+
handler = logging.StreamHandler(sys.stdout)
- handler.lock = multiprocessing.RLock()
handler.setLevel(logging.DEBUG)
formatter = RumbaFormatter()
handler.setFormatter(formatter)
- logging.basicConfig(handlers=[handler], level=logging.DEBUG)
- logging.getLogger('').setLevel(logging.ERROR)
- logging.getLogger('rumba').setLevel(logging.INFO)
+ listener = QueueListener(mq, handler)
+ listener.start()
# Used for the first call, in order to configure logging
diff --git a/rumba/model.py b/rumba/model.py
index 4e2591a..6ba93b0 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -200,6 +200,7 @@ class SSHConfig:
self.proxy_server = proxy_server
self.client = None
self.proxy_client = None
+ self.http_proxy = None
def set_username(self, username):
self.username = username
@@ -207,6 +208,9 @@ class SSHConfig:
def set_password(self, password):
self.password = password
+ def set_http_proxy(self, proxy):
+ self.http_proxy = proxy
+
# A node in the experiment
#
@@ -534,8 +538,8 @@ class Experiment:
self.dt_strategy = 'full-mesh' # 'minimal', 'manual'
self.dif_ordering = []
self.enrollments = [] # a list of per-DIF lists of enrollments
- self.dt_flows = [] # a list of per-DIF lists of data transfer flows
- self.mgmt_flows = [] # a list of per-DIF lists of management flows
+ self.dt_flows = [] # a list of per-DIF lists of data transfer flows
+ self.mgmt_flows = [] # a list of per-DIF lists of management flows
# Generate missing information
self.generate()
@@ -881,8 +885,8 @@ class ClientProcess(Client):
try:
self.pid = self.node.execute_command(cmd)
except SSHException:
- logger.warn('Could not start client %s on node %s.',
- self.ap, node.name)
+ logger.warning('Could not start client %s on node %s.',
+ self.ap, node.name)
logger.debug('Client app %s on node %s got pid %s.',
self.ap, self.node.name, self.pid)
@@ -894,8 +898,8 @@ class ClientProcess(Client):
try:
self.node.execute_command("kill %s" % self.pid)
except SSHException:
- logger.warn('Could not kill client %s on node %s.',
- self.ap, self.node.name)
+ logger.warning('Could not kill client %s on node %s.',
+ self.ap, self.node.name)
def kill_check(self):
"""Check if the process should keep running, stop it if not,
@@ -957,8 +961,7 @@ class Server:
"""
number = poisson(self.arrival_rate * interval)
number = int(min(number, self.max_clients))
- l = [self.make_client_process() for _ in range(number)]
- return l
+ return [self.make_client_process() for _ in range(number)]
def make_client_process(self):
"""Returns a client of this server"""
@@ -983,8 +986,8 @@ class Server:
try:
self.pids[node] = (node.execute_commands(cmds))
except SSHException:
- logger.warn('Could not start server %s on node %s.',
- self.ap, node.name)
+ logger.warning('Could not start server %s on node %s.',
+ self.ap, node.name)
def stop(self):
for node, pid in self.pids.items():
@@ -995,8 +998,8 @@ class Server:
try:
node.execute_command("kill %s" % pid)
except SSHException:
- logger.warn('Could not kill server %s on node %s.',
- self.ap, node.name)
+ logger.warning('Could not kill server %s on node %s.',
+ self.ap, node.name)
# Base class for ARCFIRE storyboards
diff --git a/rumba/multiprocessing_utils.py b/rumba/multiprocessing_utils.py
new file mode 100644
index 0000000..ce5dd5c
--- /dev/null
+++ b/rumba/multiprocessing_utils.py
@@ -0,0 +1,106 @@
+import multiprocessing.dummy as multiprocessing
+import sys
+
+import rumba.log as log
+
+if sys.version_info[0] >= 3:
+ import contextlib
+else:
+ import contextlib2 as contextlib
+
+
+logger = log.get_logger(__name__)
+
+
+def call_in_parallel(name_list, argument_list, executor_list):
+ """
+ Calls each executor in executor_list with the corresponding
+ argument in argument_list
+
+ Assumes that the three lists are the same length, will fail otherwise.
+ Is equivalent to
+ for i, e in enumerate(executor_list):
+ e(argument_list[i])
+ but all calls will be executed in parallel.
+
+ If successful, no output will be given. Otherwise, this will raise
+ the exception raised by one failed call at random.
+
+ :param name_list: list of names of the executors (used for logging)
+ :param argument_list: list of arguments to the executors
+ :param executor_list: list of executors (as functions)
+ """
+ if len(executor_list) != len(name_list) \
+ or len(executor_list) != len(argument_list):
+ raise ValueError("Names, arguments and executors lists "
+ "must have the same length")
+
+ def job(executor, name, m_queue, argument):
+ try:
+ # m_queue.cancel_join_thread()
+ logger.debug('Starting process "%s".'
+ % (name,))
+ executor(argument)
+ m_queue.put("DONE")
+ except BaseException as e:
+ logger.error('Setup failed. %s: %s',
+ type(e).__name__,
+ str(e))
+ m_queue.put(e)
+
+ logger.debug('About to start spawning processes.')
+ queue = multiprocessing.Queue()
+ with contextlib.ExitStack() as stack:
+ # This is a composite context manager.
+ # After exiting the 'with' block, the __exit__ method of all
+ # processes that have been registered will be called.
+ msg_to_be_read = 0
+ for i, e in enumerate(executor_list):
+ stack.enter_context(ProcessContextManager(
+ target=job,
+ args=(e, name_list[i], queue, argument_list[i])
+ ))
+ msg_to_be_read += 1
+ results = []
+ for _ in range(len(executor_list)):
+ result = queue.get() # This blocks until results are available
+ msg_to_be_read -= 1
+ results.append(result)
+ for result in results:
+ if result != "DONE":
+ raise result
+ # If we get here, we got a success for every node, hence we are done.
+
+
+class ProcessContextManager(object):
+
+ def __init__(self, target, args=None, kwargs=None):
+ if args is None:
+ args = ()
+ if kwargs is None:
+ kwargs = {}
+ self.process = multiprocessing.Process(
+ target=target,
+ args=tuple(args),
+ kwargs=kwargs
+ )
+
+ def __enter__(self):
+ self.process.start()
+ return self.process
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_tb is not None or exc_val is not None or exc_tb is not None:
+ logger.error('Subprocess error: %s.' % (type(exc_val).__name__,))
+ try:
+ self.process.terminate()
+ self.process.join()
+ except AttributeError:
+ # We are using multiprocessing.dummy, so no termination.
+ # We trust the threads will die with the application
+ # (since we are shutting down anyway)
+ pass
+ return False
+ else:
+ self.process.join()
+ return True
diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py
index 3a64ede..0f2d69e 100644
--- a/rumba/prototypes/irati.py
+++ b/rumba/prototypes/irati.py
@@ -22,25 +22,30 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., http://www.fsf.org/about/contact/.
#
-
import copy
import json
-
import os
import time
import rumba.ssh_support as ssh
import rumba.model as mod
+import rumba.multiprocessing_utils as m_processing
import rumba.prototypes.irati_templates as irati_templates
import rumba.log as log
-
logger = log.get_logger(__name__)
# An experiment over the IRATI implementation
class Experiment(mod.Experiment):
+ @staticmethod
+ def make_executor(node, packages, testbed):
+ def executor(commands):
+ ssh.aptitude_install(testbed, node, packages)
+ node.execute_commands(commands, time_out=None, use_proxy=True)
+ return executor
+
def prototype_name(self):
return 'irati'
@@ -90,11 +95,17 @@ class Experiment(mod.Experiment):
"cd ~; git clone -b arcfire https://github.com/IRATI/stack",
"cd ~/stack && "
+ self.sudo("./configure && ") + self.sudo("make install")]
-
+ names = []
+ executors = []
+ args = []
for node in self.nodes:
- ssh.aptitude_install(self.testbed, node, packages)
- ssh.execute_proxy_commands(self.testbed, node.ssh_config,
- cmds, time_out=None)
+
+ executor = self.make_executor(node, packages, self.testbed)
+
+ names.append(node.name)
+ executors.append(executor)
+ args.append(cmds)
+ m_processing.call_in_parallel(names, args, executors)
def bootstrap_network(self):
"""Creates the network by enrolling and configuring the nodes"""
diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py
index 60a3767..3103811 100644
--- a/rumba/ssh_support.py
+++ b/rumba/ssh_support.py
@@ -58,6 +58,7 @@ def _print_stream(stream):
return o.rstrip()
def ssh_connect(hostname, port, username, password, time_out, proxy_server):
+ logger.debug('Trying to open a connection towards node %s.' % hostname)
retry = 0
max_retries = 10
while retry < max_retries:
@@ -180,17 +181,33 @@ def execute_commands(testbed, ssh_config, commands, time_out=3):
raise SSHException('Failed to execute command')
o = _print_stream(stdout)
if chan.recv_exit_status() != 0:
+ # Get ready for printing stdout and stderr
+ if o != "":
+ list_print = ['**** STDOUT:']
+ list_print += o.split('\\n')
+ else:
+ list_print = []
e = _print_stream(stderr)
- o_e = o + (('\n' + e) if e != "" else "")
- raise SSHException('A remote command returned an error.' +
- (('\n' + o_e) if o_e != "" else ""))
+ if e != "":
+ list_print.append('**** STDERR:')
+ list_print += e.split('\\n')
+ raise SSHException('A remote command returned an error. '
+ 'Output:\n\n\t' +
+ '\n\t'.join(list_print) + '\n')
return o
finally:
- ssh_config.client.close()
- if ssh_config.proxy_client is not None:
- ssh_config.proxy_client.close()
- ssh_config.client = None
- ssh_config.proxy_client = None
+ ##
+ # The following lines are a fix to make this work under
+ # true multiprocessing. Right now we are using
+ # dummy multiprocessing (i.e. threading), so not needed.
+ # They have been kept here in case we want to get back to it.
+ ##
+ # ssh_config.client.close()
+ # if ssh_config.proxy_client is not None:
+ # ssh_config.proxy_client.close()
+ # ssh_config.client = None
+ # ssh_config.proxy_client = None
+ pass
def execute_command(testbed, ssh_config, command, time_out=3):
"""
@@ -329,6 +346,7 @@ def setup_vlans(testbed, node, vlans):
execute_commands(testbed, node.ssh_config, cmds)
+
def aptitude_install(testbed, node, packages):
"""
Installs packages through aptitude
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py
index da93110..d6eb458 100644
--- a/rumba/testbeds/jfed.py
+++ b/rumba/testbeds/jfed.py
@@ -200,6 +200,8 @@ class Testbed(mod.Testbed):
"-P", self.password])
def swap_in(self, experiment):
+ for node in experiment.nodes:
+ node.ssh_config.set_http_proxy(self.http_proxy)
self.create_rspec(experiment)
auth_name_r = self.auth_name.replace(".", "-")
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index e4a35ab..75a564c 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -22,21 +22,28 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., http://www.fsf.org/about/contact/.
#
-
-import multiprocessing
-import time
-import subprocess
+import multiprocessing as old_m
+import multiprocessing.dummy as multiprocessing
import os
+import subprocess
import sys
+import time
+
+import rumba.model as mod
+import rumba.log as log
+import rumba.ssh_support as ssh_support
+import rumba.multiprocessing_utils as m_processing
+from rumba.multiprocessing_utils import ProcessContextManager
if sys.version_info[0] >= 3:
from urllib.request import urlretrieve
else:
from urllib import urlretrieve
-import rumba.model as mod
-import rumba.log as log
-import rumba.ssh_support as ssh_support
+if sys.version_info[0] >= 3:
+ import contextlib
+else:
+ import contextlib2 as contextlib
logger = log.get_logger(__name__)
@@ -52,12 +59,13 @@ class Testbed(mod.Testbed):
self.boot_processes = []
self.bzimage_path = bzimage_path
self.initramfs_path = initramfs_path
+ self.multiproc_manager = None
# Prepend sudo to all commands if the user is not 'root'
def may_sudo(self, cmds):
if os.geteuid() != 0:
- for i in range(len(cmds)):
- cmds[i] = "sudo %s" % cmds[i]
+ for i, cmd in enumerate(cmds):
+ cmds[i] = "sudo %s" % cmd
@staticmethod
def _run_command_chain(commands, results_queue,
@@ -77,9 +85,11 @@ class Testbed(mod.Testbed):
:return: None
"""
errors = 0
+ # results_queue.cancel_join_thread()
+ # error_queue.cancel_join_thread()
for command in commands:
- if not error_queue.empty() and not ignore_errors:
- break
+ # if not error_queue.empty() and not ignore_errors:
+ # break
logger.debug('executing >> %s', command)
try:
subprocess.check_call(command.split())
@@ -88,12 +98,12 @@ class Testbed(mod.Testbed):
errors += 1
if not ignore_errors:
break
- except KeyboardInterrupt as e:
+ except KeyboardInterrupt:
error_queue.put('Interrupted')
if errors == 0:
results_queue.put("Command chain ran correctly")
else:
- results_queue.put("Command chain ran with %d errors" % errors)
+ results_queue.put("Command chain ran with %d error(s)" % errors)
def recover_if_names(self, experiment):
for node in experiment.nodes:
@@ -161,10 +171,8 @@ class Testbed(mod.Testbed):
logger.info('Setting up interfaces.')
- # Building bridges and taps
- shim_processes = []
- r_queue = multiprocessing.Queue()
- e_queue = multiprocessing.Queue()
+ # Building bridges and taps initialization scripts
+ br_tab_scripts = []
for shim in experiment.dif_ordering:
if not isinstance(shim, mod.ShimEthDIF):
# Nothing to do here
@@ -201,6 +209,7 @@ class Testbed(mod.Testbed):
% {'tap': tap_id, 'speed': speed}
).split('\n')
+ # While we're at it, build vm ports table and ipcp table
vm['ports'].append({'tap_id': tap_id,
'shim': shim,
'port_id': port_id})
@@ -216,63 +225,101 @@ class Testbed(mod.Testbed):
ipcp.ifname = tap_id
# TODO deal with Ip address (shim UDP DIF).
- # Avoid stacking processes if one failed before.
- if not e_queue.empty():
- break
- # Launch commands asynchronously
- self.may_sudo(command_list)
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(command_list,
- r_queue,
- e_queue))
- shim_processes.append(process)
- process.start()
-
- # Wait for all processes to be over.
- total_processes = len(shim_processes)
- max_waiting_time = 4 * total_processes
- over_processes = 0
-
- while over_processes < total_processes and max_waiting_time > 0:
- # Check for deadlock
-
- # Check for errors
- if not e_queue.empty():
- error_str = str(e_queue.get())
- logger.error('Testbed instantiation failed: %s', error_str)
-
- # Wait for the running processes to quit before swapping out
- for process in shim_processes:
- process.join()
-
- raise Exception('Failure: %s' % error_str)
- try:
- # Check for results
- result = r_queue.get(timeout=1)
- if result == "Command chain ran correctly":
- over_processes += 1
- logger.debug('%s/%s processes completed',
- over_processes, total_processes)
- except:
- max_waiting_time -= 1
-
- if max_waiting_time == 0:
- logger.error("Swap in is in deadlock, aborting.")
- for process in shim_processes:
- process.terminate()
- time.sleep(0.1)
- process.join()
- raise Exception('Swap in is in deadlock')
-
- for process in shim_processes:
- process.join()
+ br_tab_scripts.append(command_list)
+ ##
+ # End of shim/node parsing block
+ ##
+ #
+
+ def executor(list_of_commands):
+ for cmd in list_of_commands:
+ logger.debug('executing >> %s', cmd)
+ subprocess.check_call(cmd.split())
+
+ names = []
+ args = []
+ executors = []
+ for i, script in enumerate(br_tab_scripts):
+ names.append(i)
+ self.may_sudo(script)
+ args.append(script)
+ executors.append(executor)
+
+ m_processing.call_in_parallel(names, args, executors)
+
+ # self.multiproc_manager = multiprocessing.Manager()
+ #
+ # r_queue = self.multiproc_manager.Queue()
+ # e_queue = self.multiproc_manager.Queue()
+ #
+ # # This is a multi-context manager. It will close all processes
+ # # as soon as the interpreter exits the 'with' block
+ # with contextlib.ExitStack() as stack:
+ # total_processes = 0
+ # for script in br_tab_scripts:
+ # # Avoid stacking processes if one failed before.
+ # if not e_queue.empty():
+ # # No need to do anything, we will abort after the loop
+ # break
+ # # Launch commands asynchronously
+ # self.may_sudo(script)
+ # stack.enter_context(ProcessContextManager(
+ # target=self._run_command_chain,
+ # args=(script, r_queue, e_queue)
+ # ))
+ # total_processes += 1
+ #
+ # # Wait for all processes to be over.
+ # max_waiting_time = 8 * total_processes
+ # over_processes = 0
+ #
+ # # Max waiting is for deadlocks
+ # while over_processes < total_processes and max_waiting_time > 0:
+ #
+ # result = None
+ # try:
+ # # Check the results
+ # result = r_queue.get(timeout=1)
+ # except: # An error here can only be a timeout from the queue.get()
+ # max_waiting_time -= 1
+ #
+ # if result is not None:
+ # if result == "Command chain ran correctly":
+ # over_processes += 1
+ # logger.debug('%s/%s processes completed',
+ # over_processes, total_processes)
+ # elif result.startswith('Command chain ran with '):
+ # error_str = str(e_queue.get())
+ # logger.error('Testbed instantiation failed: %s', error_str)
+ # raise Exception('Failure: %s' % error_str)
+ # # ^ This will exit the 'with' block,
+ # # closing all running processes.
+ # else:
+ # logger.error('Unexpected result: %s.', result)
+ # raise Exception('Unexpected result: %s' % result)
+ # # ^ This will exit the 'with' block,
+ # # closing all running processes.
+ #
+ # if max_waiting_time == 0:
+ # logger.error("Swap in is in deadlock, aborting.")
+ # raise Exception('Swap in is in deadlock')
+ # # ^ This will exit the 'with' block,
+ # # closing all running processes.
+ #
+ # del r_queue
+ # del e_queue
+ # self.multiproc_manager = None
+ #
+ # ##
+ # # End of the 'with' block
+ # ##
logger.info('Interfaces setup complete. '
'Building VMs (this might take a while).')
# Building vms
- boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
+ boot_batch_size = max(1, old_m.cpu_count() // 2)
booting_budget = boot_batch_size
boot_backoff = 12 # in seconds
base_port = 2222
@@ -361,14 +408,16 @@ class Testbed(mod.Testbed):
if booting_budget < boot_backoff:
tsleep = boot_backoff * (boot_batch_size - booting_budget) / \
boot_batch_size
- logger.info('Sleeping %s secs '
+ logger.debug('Sleeping %s secs '
'waiting for the last VMs to boot',
tsleep)
time.sleep(tsleep)
# TODO: to be removed, we should loop in the ssh part
- logger.info('Sleeping 5 seconds, just to be on the safe side')
- time.sleep(5)
+ # logger.info('Sleeping 5 seconds, just to be on the safe side')
+ # time.sleep(5)
+
+ logger.info('All VMs are running. Moving on...')
self.recover_if_names(experiment)
@@ -389,86 +438,120 @@ class Testbed(mod.Testbed):
process.wait()
logger.info('Destroying interfaces.')
- port_processes = []
- error_queue = multiprocessing.Queue()
- results_queue = multiprocessing.Queue()
- for vm_name, vm in self.vms.items():
- for port in vm['ports']:
- tap = port['tap_id']
- shim = port['shim']
+ self.multiproc_manager = multiprocessing.Manager()
+ results_queue = self.multiproc_manager.Queue()
+ error_queue = self.multiproc_manager.Queue()
+
+ # See previous ExitStack usage for explanation
+ with contextlib.ExitStack() as stack:
+ total_processes = 0
+ for vm_name, vm in self.vms.items():
+ for port in vm['ports']:
+ tap = port['tap_id']
+ shim = port['shim']
+
+ commands = []
+ commands += ('brctl delif %(br)s %(tap)s\n'
+ 'ip link set dev %(tap)s down\n'
+ 'ip tuntap del mode tap name %(tap)s'
+ % {'tap': tap, 'br': shim.name}
+ ).split('\n')
+ self.may_sudo(commands)
+ stack.enter_context(ProcessContextManager(
+ target=self._run_command_chain,
+ args=(commands, results_queue, error_queue),
+ kwargs={'ignore_errors': True}
+ ))
+ total_processes += 1
+
+ max_waiting_time = 4 * total_processes
+ over_processes = 0
+
+ while max_waiting_time > 0 and over_processes < total_processes:
+
+ result = None
+ try:
+ # Check for results
+ result = results_queue.get(timeout=1)
+ except:
+ max_waiting_time -= 1
+
+ if result == "Command chain ran correctly":
+ over_processes += 1
+ elif result.startswith('Command chain ran with '):
+ error_str = str(error_queue.get())
+ logger.warning('Testbed tear-down failed: %s', error_str)
+ over_processes += 1
+ else:
+ logger.warning('Testbed tear-down failed, '
+ 'Unexpected result %s.', result)
+ over_processes += 1
+
+ logger.debug('%s/%s tear-down port '
+ 'processes completed',
+ over_processes, total_processes)
+ ##
+ # End of the port 'with' block
+ ##
+
+ del error_queue
+ del results_queue
+ self.multiproc_manager = None
+
+ logger.info('Port tear-down complete.')
+
+ self.multiproc_manager = multiprocessing.Manager()
+ error_queue_2 = self.multiproc_manager.Queue()
+ results_queue_2 = self.multiproc_manager.Queue()
+
+ # See previous ExitStack usage for explanation
+ with contextlib.ExitStack() as stack:
+ total_processes = 0
+ for shim in self.shims:
commands = []
- commands += ('brctl delif %(br)s %(tap)s\n'
- 'ip link set dev %(tap)s down\n'
- 'ip tuntap del mode tap name %(tap)s'
- % {'tap': tap, 'br': shim.name}
+ commands += ('ip link set dev %(br)s down\n'
+ 'brctl delbr %(br)s'
+ % {'br': shim.name}
).split('\n')
self.may_sudo(commands)
- process = multiprocessing.Process(
+ stack.enter_context(ProcessContextManager(
target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True})
- port_processes.append(process)
- process.start()
-
- total_processes = len(port_processes)
- max_waiting_time = 2 * total_processes
- over_processes = 0
-
- while max_waiting_time > 0 and over_processes < total_processes:
- # Check for errors
- if not error_queue.empty():
- logger.warning('Failure while shutting down: %s',
- str(error_queue.get()))
- over_processes += 1
- try:
- # Check for results
- result = results_queue.get(timeout=1)
+ args=(commands,
+ results_queue_2,
+ error_queue_2),
+ kwargs={'ignore_errors': True}
+ ))
+ total_processes += 1
+
+ max_waiting_time = 4 * total_processes
+ over_processes = 0
+
+ while max_waiting_time > 0 and over_processes < total_processes:
+
+ result = None
+ try:
+ # Check for results
+ result = results_queue_2.get(timeout=1)
+ except:
+ max_waiting_time -= 1
+
if result == "Command chain ran correctly":
over_processes += 1
- logger.debug('%s/%s tear-down port '
- 'processes completed',
- over_processes, total_processes)
- except:
- max_waiting_time -= 1
-
- error_queue = multiprocessing.Queue()
- results_queue = multiprocessing.Queue()
- shim_processes = []
-
- for shim in self.shims:
- commands = []
- commands += ('ip link set dev %(br)s down\n'
- 'brctl delbr %(br)s'
- % {'br': shim.name}
- ).split('\n')
- self.may_sudo(commands)
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands,
- results_queue,
- error_queue),
- kwargs={'ignore_errors': True})
- shim_processes.append(process)
- process.start()
-
- total_processes = len(shim_processes)
- max_waiting_time = 2 * total_processes
- over_processes = 0
-
- while max_waiting_time > 0 and over_processes < total_processes:
- # Check for errors
- if not error_queue.empty():
- logger.warning('Failure while shutting down: %s'
- % str(error_queue.get()))
- over_processes += 1
- try:
- # Check for results
- result = results_queue.get(timeout=1)
- if result == "Command chain ran correctly":
+ elif result.startswith('Command chain ran with '):
+ error_str = str(error_queue_2.get())
+ logger.warning('Testbed tear-down failed: %s', error_str)
+ over_processes += 1
+ else:
+ logger.warning('Testbed tear-down failed, '
+ 'Unexpected result %s.', result)
over_processes += 1
- logger.debug('%s/%s tear-down shim '
- 'processes completed'
- % (over_processes, total_processes))
- except:
- max_waiting_time -= 1
+
+ logger.debug('%s/%s tear-down bridge '
+ 'processes completed',
+ over_processes, total_processes)
+ ##
+ # End of the bridge 'with' block
+ ##
+
logger.info('Experiment has been swapped out.')
diff --git a/setup.py b/setup.py
index 8fe1e93..ab10913 100755
--- a/setup.py
+++ b/setup.py
@@ -13,7 +13,7 @@ setuptools.setup(
license='LGPL',
description='Rumba measurement framework for RINA',
packages=['rumba', 'rumba.testbeds', 'rumba.prototypes'],
- install_requires=['paramiko', 'repoze.lru; python_version<"3.2"'],
+ install_requires=['paramiko', 'repoze.lru; python_version<"3.2"', 'contextlib2; python_version<"3.0"'],
extras_require={'NumpyAcceleration': ['numpy']},
scripts=['tools/rumba-access']
)