aboutsummaryrefslogtreecommitdiff
path: root/rumba/testbeds
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-11-16 14:31:58 +0100
committerMarco Capitani <m.capitani@nextworks.it>2017-11-16 14:57:33 +0100
commitc884a5b8c1e2a2f4d610cae7b9aa547b95424210 (patch)
tree33f06897a76950822d46d32d09ed08a1d87a410c /rumba/testbeds
parent986676ade9ffe4738734566c50eeed4b0ce7dd5f (diff)
downloadrumba-c884a5b8c1e2a2f4d610cae7b9aa547b95424210.tar.gz
rumba-c884a5b8c1e2a2f4d610cae7b9aa547b95424210.zip
testbed-qemu: refactor swapout to use call_in_parallel
minor: also cleaned up some commented old code
Diffstat (limited to 'rumba/testbeds')
-rw-r--r--rumba/testbeds/qemu.py233
1 files changed, 49 insertions, 184 deletions
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 530b4ac..a74fae0 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -24,27 +24,22 @@
# Foundation, Inc., http://www.fsf.org/about/contact/.
#
import multiprocessing as old_m
-import multiprocessing.dummy as multiprocessing
import os
import subprocess
import sys
import time
+from subprocess import CalledProcessError
import rumba.model as mod
import rumba.log as log
import rumba.ssh_support as ssh_support
import rumba.multiprocess as m_processing
-from rumba.multiprocess import ProcessContextManager
if sys.version_info[0] >= 3:
from urllib.request import urlretrieve
else:
from urllib import urlretrieve
-if sys.version_info[0] >= 3:
- import contextlib
-else:
- import contextlib2 as contextlib
logger = log.get_logger(__name__)
@@ -248,73 +243,6 @@ class Testbed(mod.Testbed):
m_processing.call_in_parallel(names, args, executors)
- # self.multiproc_manager = multiprocessing.Manager()
- #
- # r_queue = self.multiproc_manager.Queue()
- # e_queue = self.multiproc_manager.Queue()
- #
- # # This is a multi-context manager. It will close all processes
- # # as soon as the interpreter exits the 'with' block
- # with contextlib.ExitStack() as stack:
- # total_processes = 0
- # for script in br_tab_scripts:
- # # Avoid stacking processes if one failed before.
- # if not e_queue.empty():
- # # No need to do anything, we will abort after the loop
- # break
- # # Launch commands asynchronously
- # self.may_sudo(script)
- # stack.enter_context(ProcessContextManager(
- # target=self._run_command_chain,
- # args=(script, r_queue, e_queue)
- # ))
- # total_processes += 1
- #
- # # Wait for all processes to be over.
- # max_waiting_time = 8 * total_processes
- # over_processes = 0
- #
- # # Max waiting is for deadlocks
- # while over_processes < total_processes and max_waiting_time > 0:
- #
- # result = None
- # try:
- # # Check the results
- # result = r_queue.get(timeout=1)
- # except: # An error here can only be a timeout from the queue.get()
- # max_waiting_time -= 1
- #
- # if result is not None:
- # if result == "Command chain ran correctly":
- # over_processes += 1
- # logger.debug('%s/%s processes completed',
- # over_processes, total_processes)
- # elif result.startswith('Command chain ran with '):
- # error_str = str(e_queue.get())
- # logger.error('Testbed instantiation failed: %s', error_str)
- # raise Exception('Failure: %s' % error_str)
- # # ^ This will exit the 'with' block,
- # # closing all running processes.
- # else:
- # logger.error('Unexpected result: %s.', result)
- # raise Exception('Unexpected result: %s' % result)
- # # ^ This will exit the 'with' block,
- # # closing all running processes.
- #
- # if max_waiting_time == 0:
- # logger.error("Swap in is in deadlock, aborting.")
- # raise Exception('Swap in is in deadlock')
- # # ^ This will exit the 'with' block,
- # # closing all running processes.
- #
- # del r_queue
- # del e_queue
- # self.multiproc_manager = None
- #
- # ##
- # # End of the 'with' block
- # ##
-
logger.info('Interfaces setup complete. '
'Building VMs (this might take a while).')
@@ -406,18 +334,14 @@ class Testbed(mod.Testbed):
vmid += 1
# Wait for the last batch of VMs to start
- if booting_budget < boot_backoff:
+ if booting_budget < boot_batch_size:
tsleep = boot_backoff * (boot_batch_size - booting_budget) / \
boot_batch_size
logger.debug('Sleeping %s secs '
- 'waiting for the last VMs to boot',
- tsleep)
+ 'waiting for the last VMs to boot',
+ tsleep)
time.sleep(tsleep)
- # TODO: to be removed, we should loop in the ssh part
- # logger.info('Sleeping 5 seconds, just to be on the safe side')
- # time.sleep(5)
-
logger.info('All VMs are running. Moving on...')
self.recover_if_names(experiment)
@@ -440,119 +364,60 @@ class Testbed(mod.Testbed):
logger.info('Destroying interfaces.')
- self.multiproc_manager = multiprocessing.Manager()
- results_queue = self.multiproc_manager.Queue()
- error_queue = self.multiproc_manager.Queue()
-
- # See previous ExitStack usage for explanation
- with contextlib.ExitStack() as stack:
- total_processes = 0
- for vm_name, vm in self.vms.items():
- for port in vm['ports']:
- tap = port['tap_id']
- shim = port['shim']
-
- commands = []
- commands += ('brctl delif %(br)s %(tap)s\n'
- 'ip link set dev %(tap)s down\n'
- 'ip tuntap del mode tap name %(tap)s'
- % {'tap': tap, 'br': shim.name}
- ).split('\n')
- self.may_sudo(commands)
- stack.enter_context(ProcessContextManager(
- target=self._run_command_chain,
- args=(commands, results_queue, error_queue),
- kwargs={'ignore_errors': True}
- ))
- total_processes += 1
-
- max_waiting_time = 4 * total_processes
- over_processes = 0
-
- while max_waiting_time > 0 and over_processes < total_processes:
+ names = []
+ args = []
+ executors = []
- result = None
+ def executor(list_of_commands):
+ for cmd in list_of_commands:
+ logger.debug('executing >> %s', cmd)
try:
- # Check for results
- result = results_queue.get(timeout=1)
- except:
- max_waiting_time -= 1
-
- if result == "Command chain ran correctly":
- over_processes += 1
- elif result.startswith('Command chain ran with '):
- error_str = str(error_queue.get())
- logger.warning('Testbed tear-down failed: %s', error_str)
- over_processes += 1
- else:
- logger.warning('Testbed tear-down failed, '
- 'Unexpected result %s.', result)
- over_processes += 1
-
- logger.debug('%s/%s tear-down port '
- 'processes completed',
- over_processes, total_processes)
- ##
- # End of the port 'with' block
- ##
-
- del error_queue
- del results_queue
- self.multiproc_manager = None
-
- logger.info('Port tear-down complete.')
+ subprocess.check_call(cmd.split())
+ except CalledProcessError as e:
+ logger.warning('Error during cleanup: %s', str(e))
- self.multiproc_manager = multiprocessing.Manager()
- error_queue_2 = self.multiproc_manager.Queue()
- results_queue_2 = self.multiproc_manager.Queue()
+ index = 0
+ for vm_name, vm in self.vms.items():
+ for port in vm['ports']:
+ tap = port['tap_id']
+ shim = port['shim']
- # See previous ExitStack usage for explanation
- with contextlib.ExitStack() as stack:
- total_processes = 0
- for shim in self.shims:
commands = []
- commands += ('ip link set dev %(br)s down\n'
- 'brctl delbr %(br)s'
- % {'br': shim.name}
+ commands += ('brctl delif %(br)s %(tap)s\n'
+ 'ip link set dev %(tap)s down\n'
+ 'ip tuntap del mode tap name %(tap)s'
+ % {'tap': tap, 'br': shim.name}
).split('\n')
self.may_sudo(commands)
- stack.enter_context(ProcessContextManager(
- target=self._run_command_chain,
- args=(commands,
- results_queue_2,
- error_queue_2),
- kwargs={'ignore_errors': True}
- ))
- total_processes += 1
- max_waiting_time = 4 * total_processes
- over_processes = 0
+ names.append(index)
+ index += 1
+ args.append(commands)
+ executors.append(executor)
- while max_waiting_time > 0 and over_processes < total_processes:
+ m_processing.call_in_parallel(names, args, executors)
- result = None
- try:
- # Check for results
- result = results_queue_2.get(timeout=1)
- except:
- max_waiting_time -= 1
-
- if result == "Command chain ran correctly":
- over_processes += 1
- elif result.startswith('Command chain ran with '):
- error_str = str(error_queue_2.get())
- logger.warning('Testbed tear-down failed: %s', error_str)
- over_processes += 1
- else:
- logger.warning('Testbed tear-down failed, '
- 'Unexpected result %s.', result)
- over_processes += 1
-
- logger.debug('%s/%s tear-down bridge '
- 'processes completed',
- over_processes, total_processes)
- ##
- # End of the bridge 'with' block
- ##
+ logger.info('Port tear-down complete. Destroying bridges.')
+
+ names = []
+ args = []
+ executors = []
+
+ index = 0
+
+ for shim in self.shims:
+ commands = []
+ commands += ('ip link set dev %(br)s down\n'
+ 'brctl delbr %(br)s'
+ % {'br': shim.name}
+ ).split('\n')
+ self.may_sudo(commands)
+
+ names.append(index)
+ index += 1
+ args.append(commands)
+ executors.append(executor)
+
+ m_processing.call_in_parallel(names, args, executors)
logger.info('Experiment has been swapped out.')