diff options
-rw-r--r-- | rumba/storyboard.py | 137 |
1 files changed, 106 insertions, 31 deletions
diff --git a/rumba/storyboard.py b/rumba/storyboard.py index c652073..56be7e0 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -31,6 +31,8 @@ # import functools import math +from multiprocessing import cpu_count +import multiprocessing.dummy as multiprocessing import os import random import time @@ -61,6 +63,33 @@ except ImportError: # PROBLEM! These logs will almost never be printed... # But we might not care +POOL = multiprocessing.Pool(cpu_count() * 6) + + +def _execute_command(node, + cmd, + callback=None, + e_callback=None, + as_root=False): + if e_callback is None: + def e_callback(e): + logger.warning('Could not execute command "%s" on node "%s". ' + 'Error: %s.', + cmd, + node.name, + e) + if callback is None: + def callback(x): + pass + + POOL.apply_async( + node.execute_command, + args=(cmd,), + kwds={'as_root': as_root}, + callback=callback, + error_callback=e_callback + ) + class _SBEntity(object): @@ -194,26 +223,42 @@ class ClientProcess(_SBEntity): self.start_time = None self.running = False self.node = node - self.pid = None + self._pid = multiprocessing.Value('i', -1) self.shutdown = shutdown self.as_root = as_root - def run(self): - """Starts this process""" + @property + def pid(self): + if self._pid.value == -1: + raise ValueError("Process %s is not running." + % (self.id,)) + else: + return self._pid.value + + @pid.setter + def pid(self, value): + self._pid.value = value + + def _make_run_cmd(self): if self.node is None: raise Exception('No node specified for client %s' % (self.ap_id,)) self.start_time = time.time() + startup = self.startup.replace("<duration>", str(self.duration)) + return "./startup.sh %s %s" % ( + self.id, + startup, + ) + + def run(self): + """Starts this process""" + logger.debug( 'Starting client app %s on node %s with duration %s.', self.ap_id, self.node.name, self.duration ) - startup = self.startup.replace("<duration>", str(self.duration)) - cmd = "./startup.sh %s %s" % ( - self.id, - startup, - ) + cmd = self._make_run_cmd() self.running = True try: @@ -224,6 +269,31 @@ class ClientProcess(_SBEntity): logger.debug('Client app %s on node %s got pid %s.', self.ap_id, self.node.name, self.pid) + def run_async(self): + """Starts this process asynchronously""" + + def callback(pid): + self.pid = pid + logger.debug('Client app %s on node %s got pid %s.', + self.ap_id, self.node.name, self.pid) + + def e_callback(e): + logger.warning('Could not start client %s on node %s. ' + 'Error: %s.', + self.ap_id, + self.node.name, + e) + + logger.debug( + 'Starting client app %s on node %s with duration %s (async).', + self.ap_id, self.node.name, self.duration + ) + + cmd = self._make_run_cmd() + + self.running = True + _execute_command(self.node, cmd, callback, e_callback, self.as_root) + def stop(self): """Stops this process""" if self.shutdown != "": @@ -231,17 +301,15 @@ class ClientProcess(_SBEntity): 'Killing client %s on node %s.', self.ap_id, self.node.name ) - try: - kill_cmd = self.shutdown.replace('<pid>', str(self.pid)) - self.node.execute_command(kill_cmd) - except ssh_support.SSHException: - logger.warning('Could not kill client %s on node %s.', - self.ap_id, self.node.name) - else: - logger.debug( - 'Client %s on node %s has terminated.', - self.ap_id, self.node.name - ) + + def callback(): + logger.debug( + 'Client %s on node %s has terminated.', + self.ap_id, self.node.name + ) + + kill_cmd = self.shutdown.replace('<pid>', str(self.pid)) + _execute_command(self.node, kill_cmd, callback, as_root=self.as_root) class Server(_SBEntity): @@ -413,16 +481,16 @@ class Server(_SBEntity): run_cmd = self.ap + ( (" " + self.options) if self.options is not None else "" ) - cmd_2 = "./startup.sh %s %s" % ('server_' + self.id, run_cmd) + cmd = "./startup.sh %s %s" % ('server_' + self.id, run_cmd) logger.debug( 'Starting server %s on node %s.', self.id, node.name ) - try: - self.pids[node] = node.execute_command(cmd_2, as_root=self.as_root) - except ssh_support.SSHException: - logger.warning('Could not start server %s on node %s.', - self.id, node.name) + + def callback(pid): + self.pids[node] = pid + + _execute_command(node, cmd, callback, as_root=self.as_root) def stop(self): """Stops this server""" @@ -432,7 +500,7 @@ class Server(_SBEntity): self.id, node.name ) try: - node.execute_command("kill %s" % pid) + _execute_command(node, "kill %s" % (pid,), as_root=self.as_root) except ssh_support.SSHException: logger.warning('Could not kill server %s on node %s.', self.id, node.name) @@ -449,7 +517,6 @@ class StoryBoard(_SBEntity): """Class representing the storyboard of an experiment""" SCRIPT_RESOLUTION = 0.1 - EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float) def get_e_id(self): return 'storyboard' @@ -728,7 +795,8 @@ class StoryBoard(_SBEntity): duration, node=None, proc_id=None, - callback=None): + callback=None, + async=True): """ Runs the specified client app with the specified parameters. @@ -746,6 +814,9 @@ class StoryBoard(_SBEntity): :param callback: callable or list thereof to be run after client termination :type callback: `callable` or `list` of `callable` + :param async: if true, the SSH communication will be dealt + with asynchronously + :type async: `bool` """ if isinstance(client, str): client = self.client_apps[client] @@ -762,7 +833,10 @@ class StoryBoard(_SBEntity): callback = [callback] process = client.process(duration, node, proc_id) self.process_dict[process.id] = process - process.run() + if async: + process.run_async() + else: + process.run() action = functools.partial(self.kill_process, process.id) term_ev = Event(action, ev_time=(self.cur_time + duration)) self.add_event(term_ev) @@ -910,7 +984,8 @@ class StoryBoard(_SBEntity): r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"' logger.debug("Writing utility startup script on nodes.") for node in self.node_map.values(): - node.execute_command( + _execute_command( + node, "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) try: @@ -1192,7 +1267,7 @@ class StoryBoard(_SBEntity): str(uuid.uuid4())[0:4] + ".pcap" ) - tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" \ + tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" % (ipcp.ifname, pcap_file))\ .process(end-start, node, 'tcpdump_proc') |