aboutsummaryrefslogtreecommitdiff
path: root/rumba/testbeds
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-11-07 11:40:54 +0100
committerSander Vrijders <sander.vrijders@ugent.be>2017-11-15 18:59:11 +0100
commit2da15caf24a8b2da70d755e065a5dc3d770c6454 (patch)
tree292456db4acc4ab8f4afffad50113cc7e219b6cd /rumba/testbeds
parent2e91ca33f90f7c74887013e08c95bb00cdd4fc00 (diff)
downloadrumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.tar.gz
rumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.zip
prototypes: Add parallel installation for IRATI
This adds parallel installation for IRATI, it also adds support for multithread/multiprocess logging. Furthermore prototype-agnostic utilities for multiprocessing have been added. Caching of clients has been re-enabled for the ssh connections.
Diffstat (limited to 'rumba/testbeds')
-rw-r--r--rumba/testbeds/jfed.py2
-rw-r--r--rumba/testbeds/qemu.py373
2 files changed, 230 insertions, 145 deletions
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py
index da93110..d6eb458 100644
--- a/rumba/testbeds/jfed.py
+++ b/rumba/testbeds/jfed.py
@@ -200,6 +200,8 @@ class Testbed(mod.Testbed):
"-P", self.password])
def swap_in(self, experiment):
+ for node in experiment.nodes:
+ node.ssh_config.set_http_proxy(self.http_proxy)
self.create_rspec(experiment)
auth_name_r = self.auth_name.replace(".", "-")
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index e4a35ab..75a564c 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -22,21 +22,28 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., http://www.fsf.org/about/contact/.
#
-
-import multiprocessing
-import time
-import subprocess
+import multiprocessing as old_m
+import multiprocessing.dummy as multiprocessing
import os
+import subprocess
import sys
+import time
+
+import rumba.model as mod
+import rumba.log as log
+import rumba.ssh_support as ssh_support
+import rumba.multiprocessing_utils as m_processing
+from rumba.multiprocessing_utils import ProcessContextManager
if sys.version_info[0] >= 3:
from urllib.request import urlretrieve
else:
from urllib import urlretrieve
-import rumba.model as mod
-import rumba.log as log
-import rumba.ssh_support as ssh_support
+if sys.version_info[0] >= 3:
+ import contextlib
+else:
+ import contextlib2 as contextlib
logger = log.get_logger(__name__)
@@ -52,12 +59,13 @@ class Testbed(mod.Testbed):
self.boot_processes = []
self.bzimage_path = bzimage_path
self.initramfs_path = initramfs_path
+ self.multiproc_manager = None
# Prepend sudo to all commands if the user is not 'root'
def may_sudo(self, cmds):
if os.geteuid() != 0:
- for i in range(len(cmds)):
- cmds[i] = "sudo %s" % cmds[i]
+ for i, cmd in enumerate(cmds):
+ cmds[i] = "sudo %s" % cmd
@staticmethod
def _run_command_chain(commands, results_queue,
@@ -77,9 +85,11 @@ class Testbed(mod.Testbed):
:return: None
"""
errors = 0
+ # results_queue.cancel_join_thread()
+ # error_queue.cancel_join_thread()
for command in commands:
- if not error_queue.empty() and not ignore_errors:
- break
+ # if not error_queue.empty() and not ignore_errors:
+ # break
logger.debug('executing >> %s', command)
try:
subprocess.check_call(command.split())
@@ -88,12 +98,12 @@ class Testbed(mod.Testbed):
errors += 1
if not ignore_errors:
break
- except KeyboardInterrupt as e:
+ except KeyboardInterrupt:
error_queue.put('Interrupted')
if errors == 0:
results_queue.put("Command chain ran correctly")
else:
- results_queue.put("Command chain ran with %d errors" % errors)
+ results_queue.put("Command chain ran with %d error(s)" % errors)
def recover_if_names(self, experiment):
for node in experiment.nodes:
@@ -161,10 +171,8 @@ class Testbed(mod.Testbed):
logger.info('Setting up interfaces.')
- # Building bridges and taps
- shim_processes = []
- r_queue = multiprocessing.Queue()
- e_queue = multiprocessing.Queue()
+ # Building bridges and taps initialization scripts
+ br_tab_scripts = []
for shim in experiment.dif_ordering:
if not isinstance(shim, mod.ShimEthDIF):
# Nothing to do here
@@ -201,6 +209,7 @@ class Testbed(mod.Testbed):
% {'tap': tap_id, 'speed': speed}
).split('\n')
+ # While we're at it, build vm ports table and ipcp table
vm['ports'].append({'tap_id': tap_id,
'shim': shim,
'port_id': port_id})
@@ -216,63 +225,101 @@ class Testbed(mod.Testbed):
ipcp.ifname = tap_id
# TODO deal with Ip address (shim UDP DIF).
- # Avoid stacking processes if one failed before.
- if not e_queue.empty():
- break
- # Launch commands asynchronously
- self.may_sudo(command_list)
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(command_list,
- r_queue,
- e_queue))
- shim_processes.append(process)
- process.start()
-
- # Wait for all processes to be over.
- total_processes = len(shim_processes)
- max_waiting_time = 4 * total_processes
- over_processes = 0
-
- while over_processes < total_processes and max_waiting_time > 0:
- # Check for deadlock
-
- # Check for errors
- if not e_queue.empty():
- error_str = str(e_queue.get())
- logger.error('Testbed instantiation failed: %s', error_str)
-
- # Wait for the running processes to quit before swapping out
- for process in shim_processes:
- process.join()
-
- raise Exception('Failure: %s' % error_str)
- try:
- # Check for results
- result = r_queue.get(timeout=1)
- if result == "Command chain ran correctly":
- over_processes += 1
- logger.debug('%s/%s processes completed',
- over_processes, total_processes)
- except:
- max_waiting_time -= 1
-
- if max_waiting_time == 0:
- logger.error("Swap in is in deadlock, aborting.")
- for process in shim_processes:
- process.terminate()
- time.sleep(0.1)
- process.join()
- raise Exception('Swap in is in deadlock')
-
- for process in shim_processes:
- process.join()
+ br_tab_scripts.append(command_list)
+ ##
+ # End of shim/node parsing block
+ ##
+ #
+
+ def executor(list_of_commands):
+ for cmd in list_of_commands:
+ logger.debug('executing >> %s', cmd)
+ subprocess.check_call(cmd.split())
+
+ names = []
+ args = []
+ executors = []
+ for i, script in enumerate(br_tab_scripts):
+ names.append(i)
+ self.may_sudo(script)
+ args.append(script)
+ executors.append(executor)
+
+ m_processing.call_in_parallel(names, args, executors)
+
+ # self.multiproc_manager = multiprocessing.Manager()
+ #
+ # r_queue = self.multiproc_manager.Queue()
+ # e_queue = self.multiproc_manager.Queue()
+ #
+ # # This is a multi-context manager. It will close all processes
+ # # as soon as the interpreter exits the 'with' block
+ # with contextlib.ExitStack() as stack:
+ # total_processes = 0
+ # for script in br_tab_scripts:
+ # # Avoid stacking processes if one failed before.
+ # if not e_queue.empty():
+ # # No need to do anything, we will abort after the loop
+ # break
+ # # Launch commands asynchronously
+ # self.may_sudo(script)
+ # stack.enter_context(ProcessContextManager(
+ # target=self._run_command_chain,
+ # args=(script, r_queue, e_queue)
+ # ))
+ # total_processes += 1
+ #
+ # # Wait for all processes to be over.
+ # max_waiting_time = 8 * total_processes
+ # over_processes = 0
+ #
+ # # Max waiting is for deadlocks
+ # while over_processes < total_processes and max_waiting_time > 0:
+ #
+ # result = None
+ # try:
+ # # Check the results
+ # result = r_queue.get(timeout=1)
+ # except: # An error here can only be a timeout from the queue.get()
+ # max_waiting_time -= 1
+ #
+ # if result is not None:
+ # if result == "Command chain ran correctly":
+ # over_processes += 1
+ # logger.debug('%s/%s processes completed',
+ # over_processes, total_processes)
+ # elif result.startswith('Command chain ran with '):
+ # error_str = str(e_queue.get())
+ # logger.error('Testbed instantiation failed: %s', error_str)
+ # raise Exception('Failure: %s' % error_str)
+ # # ^ This will exit the 'with' block,
+ # # closing all running processes.
+ # else:
+ # logger.error('Unexpected result: %s.', result)
+ # raise Exception('Unexpected result: %s' % result)
+ # # ^ This will exit the 'with' block,
+ # # closing all running processes.
+ #
+ # if max_waiting_time == 0:
+ # logger.error("Swap in is in deadlock, aborting.")
+ # raise Exception('Swap in is in deadlock')
+ # # ^ This will exit the 'with' block,
+ # # closing all running processes.
+ #
+ # del r_queue
+ # del e_queue
+ # self.multiproc_manager = None
+ #
+ # ##
+ # # End of the 'with' block
+ # ##
logger.info('Interfaces setup complete. '
'Building VMs (this might take a while).')
# Building vms
- boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
+ boot_batch_size = max(1, old_m.cpu_count() // 2)
booting_budget = boot_batch_size
boot_backoff = 12 # in seconds
base_port = 2222
@@ -361,14 +408,16 @@ class Testbed(mod.Testbed):
if booting_budget < boot_backoff:
tsleep = boot_backoff * (boot_batch_size - booting_budget) / \
boot_batch_size
- logger.info('Sleeping %s secs '
+ logger.debug('Sleeping %s secs '
'waiting for the last VMs to boot',
tsleep)
time.sleep(tsleep)
# TODO: to be removed, we should loop in the ssh part
- logger.info('Sleeping 5 seconds, just to be on the safe side')
- time.sleep(5)
+ # logger.info('Sleeping 5 seconds, just to be on the safe side')
+ # time.sleep(5)
+
+ logger.info('All VMs are running. Moving on...')
self.recover_if_names(experiment)
@@ -389,86 +438,120 @@ class Testbed(mod.Testbed):
process.wait()
logger.info('Destroying interfaces.')
- port_processes = []
- error_queue = multiprocessing.Queue()
- results_queue = multiprocessing.Queue()
- for vm_name, vm in self.vms.items():
- for port in vm['ports']:
- tap = port['tap_id']
- shim = port['shim']
+ self.multiproc_manager = multiprocessing.Manager()
+ results_queue = self.multiproc_manager.Queue()
+ error_queue = self.multiproc_manager.Queue()
+
+ # See previous ExitStack usage for explanation
+ with contextlib.ExitStack() as stack:
+ total_processes = 0
+ for vm_name, vm in self.vms.items():
+ for port in vm['ports']:
+ tap = port['tap_id']
+ shim = port['shim']
+
+ commands = []
+ commands += ('brctl delif %(br)s %(tap)s\n'
+ 'ip link set dev %(tap)s down\n'
+ 'ip tuntap del mode tap name %(tap)s'
+ % {'tap': tap, 'br': shim.name}
+ ).split('\n')
+ self.may_sudo(commands)
+ stack.enter_context(ProcessContextManager(
+ target=self._run_command_chain,
+ args=(commands, results_queue, error_queue),
+ kwargs={'ignore_errors': True}
+ ))
+ total_processes += 1
+
+ max_waiting_time = 4 * total_processes
+ over_processes = 0
+
+ while max_waiting_time > 0 and over_processes < total_processes:
+
+ result = None
+ try:
+ # Check for results
+ result = results_queue.get(timeout=1)
+ except:
+ max_waiting_time -= 1
+
+ if result == "Command chain ran correctly":
+ over_processes += 1
+ elif result.startswith('Command chain ran with '):
+ error_str = str(error_queue.get())
+ logger.warning('Testbed tear-down failed: %s', error_str)
+ over_processes += 1
+ else:
+ logger.warning('Testbed tear-down failed, '
+ 'Unexpected result %s.', result)
+ over_processes += 1
+
+ logger.debug('%s/%s tear-down port '
+ 'processes completed',
+ over_processes, total_processes)
+ ##
+ # End of the port 'with' block
+ ##
+
+ del error_queue
+ del results_queue
+ self.multiproc_manager = None
+
+ logger.info('Port tear-down complete.')
+
+ self.multiproc_manager = multiprocessing.Manager()
+ error_queue_2 = self.multiproc_manager.Queue()
+ results_queue_2 = self.multiproc_manager.Queue()
+
+ # See previous ExitStack usage for explanation
+ with contextlib.ExitStack() as stack:
+ total_processes = 0
+ for shim in self.shims:
commands = []
- commands += ('brctl delif %(br)s %(tap)s\n'
- 'ip link set dev %(tap)s down\n'
- 'ip tuntap del mode tap name %(tap)s'
- % {'tap': tap, 'br': shim.name}
+ commands += ('ip link set dev %(br)s down\n'
+ 'brctl delbr %(br)s'
+ % {'br': shim.name}
).split('\n')
self.may_sudo(commands)
- process = multiprocessing.Process(
+ stack.enter_context(ProcessContextManager(
target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True})
- port_processes.append(process)
- process.start()
-
- total_processes = len(port_processes)
- max_waiting_time = 2 * total_processes
- over_processes = 0
-
- while max_waiting_time > 0 and over_processes < total_processes:
- # Check for errors
- if not error_queue.empty():
- logger.warning('Failure while shutting down: %s',
- str(error_queue.get()))
- over_processes += 1
- try:
- # Check for results
- result = results_queue.get(timeout=1)
+ args=(commands,
+ results_queue_2,
+ error_queue_2),
+ kwargs={'ignore_errors': True}
+ ))
+ total_processes += 1
+
+ max_waiting_time = 4 * total_processes
+ over_processes = 0
+
+ while max_waiting_time > 0 and over_processes < total_processes:
+
+ result = None
+ try:
+ # Check for results
+ result = results_queue_2.get(timeout=1)
+ except:
+ max_waiting_time -= 1
+
if result == "Command chain ran correctly":
over_processes += 1
- logger.debug('%s/%s tear-down port '
- 'processes completed',
- over_processes, total_processes)
- except:
- max_waiting_time -= 1
-
- error_queue = multiprocessing.Queue()
- results_queue = multiprocessing.Queue()
- shim_processes = []
-
- for shim in self.shims:
- commands = []
- commands += ('ip link set dev %(br)s down\n'
- 'brctl delbr %(br)s'
- % {'br': shim.name}
- ).split('\n')
- self.may_sudo(commands)
- process = multiprocessing.Process(target=self._run_command_chain,
- args=(commands,
- results_queue,
- error_queue),
- kwargs={'ignore_errors': True})
- shim_processes.append(process)
- process.start()
-
- total_processes = len(shim_processes)
- max_waiting_time = 2 * total_processes
- over_processes = 0
-
- while max_waiting_time > 0 and over_processes < total_processes:
- # Check for errors
- if not error_queue.empty():
- logger.warning('Failure while shutting down: %s'
- % str(error_queue.get()))
- over_processes += 1
- try:
- # Check for results
- result = results_queue.get(timeout=1)
- if result == "Command chain ran correctly":
+ elif result.startswith('Command chain ran with '):
+ error_str = str(error_queue_2.get())
+ logger.warning('Testbed tear-down failed: %s', error_str)
+ over_processes += 1
+ else:
+ logger.warning('Testbed tear-down failed, '
+ 'Unexpected result %s.', result)
over_processes += 1
- logger.debug('%s/%s tear-down shim '
- 'processes completed'
- % (over_processes, total_processes))
- except:
- max_waiting_time -= 1
+
+ logger.debug('%s/%s tear-down bridge '
+ 'processes completed',
+ over_processes, total_processes)
+ ##
+ # End of the bridge 'with' block
+ ##
+
logger.info('Experiment has been swapped out.')