aboutsummaryrefslogtreecommitdiff
path: root/rumba
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-11-16 14:31:58 +0100
committerMarco Capitani <m.capitani@nextworks.it>2017-11-16 14:57:33 +0100
commitc884a5b8c1e2a2f4d610cae7b9aa547b95424210 (patch)
tree33f06897a76950822d46d32d09ed08a1d87a410c /rumba
parent986676ade9ffe4738734566c50eeed4b0ce7dd5f (diff)
downloadrumba-c884a5b8c1e2a2f4d610cae7b9aa547b95424210.tar.gz
rumba-c884a5b8c1e2a2f4d610cae7b9aa547b95424210.zip
testbed-qemu: refactor swapout to use call_in_parallel
minor: also cleaned up some commented old code
Diffstat (limited to 'rumba')
-rw-r--r--rumba/prototypes/irati.py6
-rw-r--r--rumba/ssh_support.py74
-rw-r--r--rumba/testbeds/qemu.py233
3 files changed, 80 insertions, 233 deletions
diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py
index c9faf36..82e56b9 100644
--- a/rumba/prototypes/irati.py
+++ b/rumba/prototypes/irati.py
@@ -194,7 +194,7 @@ class Experiment(mod.Experiment):
def enroll_nodes(self):
"""Runs the enrollments one by one, respecting dependencies"""
- logger.info("Waiting 5 seconds for the ipcm to start.")
+ logger.info("Starting enrollment phase.")
time.sleep(5)
for enrollment_list in self.enrollments:
for e in enrollment_list:
@@ -268,10 +268,6 @@ class Experiment(mod.Experiment):
"difName": "%s" % (apm['dif'])
})
- # TODO ask: I guess this will need to be added,
- # and in that case we should add it to the qemu plugin too...
- # Where should we take it in input?
-
if self.manager:
# Add MAD/Manager configuration
irati_templates.get_ipcmconf_base()["addons"] = {
diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py
index 9990fc9..39dbf5f 100644
--- a/rumba/ssh_support.py
+++ b/rumba/ssh_support.py
@@ -165,50 +165,36 @@ def execute_commands(testbed, ssh_config, commands, time_out=3):
ssh_config.proxy_client = proxy_client
o = ""
- try:
- for command in commands:
- logger.debug("%s@%s:%s >> %s" % (testbed.username,
- ssh_config.hostname,
- ssh_config.port,
- command))
- envars = '. /etc/profile;'
- command = envars + ' ' + command
- chan = ssh_config.client.get_transport().open_session()
- stdout = chan.makefile()
- stderr = chan.makefile_stderr()
- try:
- chan.exec_command(command)
- except paramiko.ssh_exception.SSHException as e:
- raise SSHException('Failed to execute command')
- o = _print_stream(stdout)
- if chan.recv_exit_status() != 0:
- # Get ready for printing stdout and stderr
- if o != "":
- list_print = ['**** STDOUT:']
- list_print += o.split('\\n')
- else:
- list_print = []
- e = _print_stream(stderr)
- if e != "":
- list_print.append('**** STDERR:')
- list_print += e.split('\\n')
- raise SSHException('A remote command returned an error. '
- 'Output:\n\n\t' +
- '\n\t'.join(list_print) + '\n')
- return o
- finally:
- ##
- # The following lines are a fix to make this work under
- # true multiprocessing. Right now we are using
- # dummy multiprocessing (i.e. threading), so not needed.
- # They have been kept here in case we want to get back to it.
- ##
- # ssh_config.client.close()
- # if ssh_config.proxy_client is not None:
- # ssh_config.proxy_client.close()
- # ssh_config.client = None
- # ssh_config.proxy_client = None
- pass
+ for command in commands:
+ logger.debug("%s@%s:%s >> %s" % (testbed.username,
+ ssh_config.hostname,
+ ssh_config.port,
+ command))
+ envars = '. /etc/profile;'
+ command = envars + ' ' + command
+ chan = ssh_config.client.get_transport().open_session()
+ stdout = chan.makefile()
+ stderr = chan.makefile_stderr()
+ try:
+ chan.exec_command(command)
+ except paramiko.ssh_exception.SSHException as e:
+ raise SSHException('Failed to execute command')
+ o = _print_stream(stdout)
+ if chan.recv_exit_status() != 0:
+ # Get ready for printing stdout and stderr
+ if o != "":
+ list_print = ['**** STDOUT:']
+ list_print += o.split('\\n')
+ else:
+ list_print = []
+ e = _print_stream(stderr)
+ if e != "":
+ list_print.append('**** STDERR:')
+ list_print += e.split('\\n')
+ raise SSHException('A remote command returned an error. '
+ 'Output:\n\n\t' +
+ '\n\t'.join(list_print) + '\n')
+ return o
def execute_command(testbed, ssh_config, command, time_out=3):
"""
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 530b4ac..a74fae0 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -24,27 +24,22 @@
# Foundation, Inc., http://www.fsf.org/about/contact/.
#
import multiprocessing as old_m
-import multiprocessing.dummy as multiprocessing
import os
import subprocess
import sys
import time
+from subprocess import CalledProcessError
import rumba.model as mod
import rumba.log as log
import rumba.ssh_support as ssh_support
import rumba.multiprocess as m_processing
-from rumba.multiprocess import ProcessContextManager
if sys.version_info[0] >= 3:
from urllib.request import urlretrieve
else:
from urllib import urlretrieve
-if sys.version_info[0] >= 3:
- import contextlib
-else:
- import contextlib2 as contextlib
logger = log.get_logger(__name__)
@@ -248,73 +243,6 @@ class Testbed(mod.Testbed):
m_processing.call_in_parallel(names, args, executors)
- # self.multiproc_manager = multiprocessing.Manager()
- #
- # r_queue = self.multiproc_manager.Queue()
- # e_queue = self.multiproc_manager.Queue()
- #
- # # This is a multi-context manager. It will close all processes
- # # as soon as the interpreter exits the 'with' block
- # with contextlib.ExitStack() as stack:
- # total_processes = 0
- # for script in br_tab_scripts:
- # # Avoid stacking processes if one failed before.
- # if not e_queue.empty():
- # # No need to do anything, we will abort after the loop
- # break
- # # Launch commands asynchronously
- # self.may_sudo(script)
- # stack.enter_context(ProcessContextManager(
- # target=self._run_command_chain,
- # args=(script, r_queue, e_queue)
- # ))
- # total_processes += 1
- #
- # # Wait for all processes to be over.
- # max_waiting_time = 8 * total_processes
- # over_processes = 0
- #
- # # Max waiting is for deadlocks
- # while over_processes < total_processes and max_waiting_time > 0:
- #
- # result = None
- # try:
- # # Check the results
- # result = r_queue.get(timeout=1)
- # except: # An error here can only be a timeout from the queue.get()
- # max_waiting_time -= 1
- #
- # if result is not None:
- # if result == "Command chain ran correctly":
- # over_processes += 1
- # logger.debug('%s/%s processes completed',
- # over_processes, total_processes)
- # elif result.startswith('Command chain ran with '):
- # error_str = str(e_queue.get())
- # logger.error('Testbed instantiation failed: %s', error_str)
- # raise Exception('Failure: %s' % error_str)
- # # ^ This will exit the 'with' block,
- # # closing all running processes.
- # else:
- # logger.error('Unexpected result: %s.', result)
- # raise Exception('Unexpected result: %s' % result)
- # # ^ This will exit the 'with' block,
- # # closing all running processes.
- #
- # if max_waiting_time == 0:
- # logger.error("Swap in is in deadlock, aborting.")
- # raise Exception('Swap in is in deadlock')
- # # ^ This will exit the 'with' block,
- # # closing all running processes.
- #
- # del r_queue
- # del e_queue
- # self.multiproc_manager = None
- #
- # ##
- # # End of the 'with' block
- # ##
-
logger.info('Interfaces setup complete. '
'Building VMs (this might take a while).')
@@ -406,18 +334,14 @@ class Testbed(mod.Testbed):
vmid += 1
# Wait for the last batch of VMs to start
- if booting_budget < boot_backoff:
+ if booting_budget < boot_batch_size:
tsleep = boot_backoff * (boot_batch_size - booting_budget) / \
boot_batch_size
logger.debug('Sleeping %s secs '
- 'waiting for the last VMs to boot',
- tsleep)
+ 'waiting for the last VMs to boot',
+ tsleep)
time.sleep(tsleep)
- # TODO: to be removed, we should loop in the ssh part
- # logger.info('Sleeping 5 seconds, just to be on the safe side')
- # time.sleep(5)
-
logger.info('All VMs are running. Moving on...')
self.recover_if_names(experiment)
@@ -440,119 +364,60 @@ class Testbed(mod.Testbed):
logger.info('Destroying interfaces.')
- self.multiproc_manager = multiprocessing.Manager()
- results_queue = self.multiproc_manager.Queue()
- error_queue = self.multiproc_manager.Queue()
-
- # See previous ExitStack usage for explanation
- with contextlib.ExitStack() as stack:
- total_processes = 0
- for vm_name, vm in self.vms.items():
- for port in vm['ports']:
- tap = port['tap_id']
- shim = port['shim']
-
- commands = []
- commands += ('brctl delif %(br)s %(tap)s\n'
- 'ip link set dev %(tap)s down\n'
- 'ip tuntap del mode tap name %(tap)s'
- % {'tap': tap, 'br': shim.name}
- ).split('\n')
- self.may_sudo(commands)
- stack.enter_context(ProcessContextManager(
- target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True}
- ))
- total_processes += 1
-
- max_waiting_time = 4 * total_processes
- over_processes = 0
-
- while max_waiting_time > 0 and over_processes < total_processes:
+ names = []
+ args = []
+ executors = []
- result = None
+ def executor(list_of_commands):
+ for cmd in list_of_commands:
+ logger.debug('executing >> %s', cmd)
try:
- # Check for results
- result = results_queue.get(timeout=1)
- except:
- max_waiting_time -= 1
-
- if result == "Command chain ran correctly":
- over_processes += 1
- elif result.startswith('Command chain ran with '):
- error_str = str(error_queue.get())
- logger.warning('Testbed tear-down failed: %s', error_str)
- over_processes += 1
- else:
- logger.warning('Testbed tear-down failed, '
- 'Unexpected result %s.', result)
- over_processes += 1
-
- logger.debug('%s/%s tear-down port '
- 'processes completed',
- over_processes, total_processes)
- ##
- # End of the port 'with' block
- ##
-
- del error_queue
- del results_queue
- self.multiproc_manager = None
-
- logger.info('Port tear-down complete.')
+ subprocess.check_call(cmd.split())
+ except CalledProcessError as e:
+ logger.warning('Error during cleanup: %s', str(e))
- self.multiproc_manager = multiprocessing.Manager()
- error_queue_2 = self.multiproc_manager.Queue()
- results_queue_2 = self.multiproc_manager.Queue()
+ index = 0
+ for vm_name, vm in self.vms.items():
+ for port in vm['ports']:
+ tap = port['tap_id']
+ shim = port['shim']
- # See previous ExitStack usage for explanation
- with contextlib.ExitStack() as stack:
- total_processes = 0
- for shim in self.shims:
commands = []
- commands += ('ip link set dev %(br)s down\n'
- 'brctl delbr %(br)s'
- % {'br': shim.name}
+ commands += ('brctl delif %(br)s %(tap)s\n'
+ 'ip link set dev %(tap)s down\n'
+ 'ip tuntap del mode tap name %(tap)s'
+ % {'tap': tap, 'br': shim.name}
).split('\n')
self.may_sudo(commands)
- stack.enter_context(ProcessContextManager(
- target=self._run_command_chain,
- args=(commands,
- results_queue_2,
- error_queue_2),
- kwargs={'ignore_errors': True}
- ))
- total_processes += 1
- max_waiting_time = 4 * total_processes
- over_processes = 0
+ names.append(index)
+ index += 1
+ args.append(commands)
+ executors.append(executor)
- while max_waiting_time > 0 and over_processes < total_processes:
+ m_processing.call_in_parallel(names, args, executors)
- result = None
- try:
- # Check for results
- result = results_queue_2.get(timeout=1)
- except:
- max_waiting_time -= 1
-
- if result == "Command chain ran correctly":
- over_processes += 1
- elif result.startswith('Command chain ran with '):
- error_str = str(error_queue_2.get())
- logger.warning('Testbed tear-down failed: %s', error_str)
- over_processes += 1
- else:
- logger.warning('Testbed tear-down failed, '
- 'Unexpected result %s.', result)
- over_processes += 1
-
- logger.debug('%s/%s tear-down bridge '
- 'processes completed',
- over_processes, total_processes)
- ##
- # End of the bridge 'with' block
- ##
+ logger.info('Port tear-down complete. Destroying bridges.')
+
+ names = []
+ args = []
+ executors = []
+
+ index = 0
+
+ for shim in self.shims:
+ commands = []
+ commands += ('ip link set dev %(br)s down\n'
+ 'brctl delbr %(br)s'
+ % {'br': shim.name}
+ ).split('\n')
+ self.may_sudo(commands)
+
+ names.append(index)
+ index += 1
+ args.append(commands)
+ executors.append(executor)
+
+ m_processing.call_in_parallel(names, args, executors)
logger.info('Experiment has been swapped out.')