From d39179eed89bcca5ed011df9a571dfebff109b56 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Mon, 4 Jun 2018 17:35:08 +0200 Subject: storyboard: add async command execution The standard command execution function blocks until the command is complete. Even if the command is run in background on the node, this implies that the storyboard has to wait for the SSH handshake to complete and the responses to come back. With the new method, the storyboard will go on with the execution without waiting (callbacks can be registered for reacting). --- rumba/storyboard.py | 137 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 106 insertions(+), 31 deletions(-) diff --git a/rumba/storyboard.py b/rumba/storyboard.py index c652073..56be7e0 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -31,6 +31,8 @@ # import functools import math +from multiprocessing import cpu_count +import multiprocessing.dummy as multiprocessing import os import random import time @@ -61,6 +63,33 @@ except ImportError: # PROBLEM! These logs will almost never be printed... # But we might not care +POOL = multiprocessing.Pool(cpu_count() * 6) + + +def _execute_command(node, + cmd, + callback=None, + e_callback=None, + as_root=False): + if e_callback is None: + def e_callback(e): + logger.warning('Could not execute command "%s" on node "%s". ' + 'Error: %s.', + cmd, + node.name, + e) + if callback is None: + def callback(x): + pass + + POOL.apply_async( + node.execute_command, + args=(cmd,), + kwds={'as_root': as_root}, + callback=callback, + error_callback=e_callback + ) + class _SBEntity(object): @@ -194,26 +223,42 @@ class ClientProcess(_SBEntity): self.start_time = None self.running = False self.node = node - self.pid = None + self._pid = multiprocessing.Value('i', -1) self.shutdown = shutdown self.as_root = as_root - def run(self): - """Starts this process""" + @property + def pid(self): + if self._pid.value == -1: + raise ValueError("Process %s is not running." + % (self.id,)) + else: + return self._pid.value + + @pid.setter + def pid(self, value): + self._pid.value = value + + def _make_run_cmd(self): if self.node is None: raise Exception('No node specified for client %s' % (self.ap_id,)) self.start_time = time.time() + startup = self.startup.replace("", str(self.duration)) + return "./startup.sh %s %s" % ( + self.id, + startup, + ) + + def run(self): + """Starts this process""" + logger.debug( 'Starting client app %s on node %s with duration %s.', self.ap_id, self.node.name, self.duration ) - startup = self.startup.replace("", str(self.duration)) - cmd = "./startup.sh %s %s" % ( - self.id, - startup, - ) + cmd = self._make_run_cmd() self.running = True try: @@ -224,6 +269,31 @@ class ClientProcess(_SBEntity): logger.debug('Client app %s on node %s got pid %s.', self.ap_id, self.node.name, self.pid) + def run_async(self): + """Starts this process asynchronously""" + + def callback(pid): + self.pid = pid + logger.debug('Client app %s on node %s got pid %s.', + self.ap_id, self.node.name, self.pid) + + def e_callback(e): + logger.warning('Could not start client %s on node %s. ' + 'Error: %s.', + self.ap_id, + self.node.name, + e) + + logger.debug( + 'Starting client app %s on node %s with duration %s (async).', + self.ap_id, self.node.name, self.duration + ) + + cmd = self._make_run_cmd() + + self.running = True + _execute_command(self.node, cmd, callback, e_callback, self.as_root) + def stop(self): """Stops this process""" if self.shutdown != "": @@ -231,17 +301,15 @@ class ClientProcess(_SBEntity): 'Killing client %s on node %s.', self.ap_id, self.node.name ) - try: - kill_cmd = self.shutdown.replace('', str(self.pid)) - self.node.execute_command(kill_cmd) - except ssh_support.SSHException: - logger.warning('Could not kill client %s on node %s.', - self.ap_id, self.node.name) - else: - logger.debug( - 'Client %s on node %s has terminated.', - self.ap_id, self.node.name - ) + + def callback(): + logger.debug( + 'Client %s on node %s has terminated.', + self.ap_id, self.node.name + ) + + kill_cmd = self.shutdown.replace('', str(self.pid)) + _execute_command(self.node, kill_cmd, callback, as_root=self.as_root) class Server(_SBEntity): @@ -413,16 +481,16 @@ class Server(_SBEntity): run_cmd = self.ap + ( (" " + self.options) if self.options is not None else "" ) - cmd_2 = "./startup.sh %s %s" % ('server_' + self.id, run_cmd) + cmd = "./startup.sh %s %s" % ('server_' + self.id, run_cmd) logger.debug( 'Starting server %s on node %s.', self.id, node.name ) - try: - self.pids[node] = node.execute_command(cmd_2, as_root=self.as_root) - except ssh_support.SSHException: - logger.warning('Could not start server %s on node %s.', - self.id, node.name) + + def callback(pid): + self.pids[node] = pid + + _execute_command(node, cmd, callback, as_root=self.as_root) def stop(self): """Stops this server""" @@ -432,7 +500,7 @@ class Server(_SBEntity): self.id, node.name ) try: - node.execute_command("kill %s" % pid) + _execute_command(node, "kill %s" % (pid,), as_root=self.as_root) except ssh_support.SSHException: logger.warning('Could not kill server %s on node %s.', self.id, node.name) @@ -449,7 +517,6 @@ class StoryBoard(_SBEntity): """Class representing the storyboard of an experiment""" SCRIPT_RESOLUTION = 0.1 - EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float) def get_e_id(self): return 'storyboard' @@ -728,7 +795,8 @@ class StoryBoard(_SBEntity): duration, node=None, proc_id=None, - callback=None): + callback=None, + async=True): """ Runs the specified client app with the specified parameters. @@ -746,6 +814,9 @@ class StoryBoard(_SBEntity): :param callback: callable or list thereof to be run after client termination :type callback: `callable` or `list` of `callable` + :param async: if true, the SSH communication will be dealt + with asynchronously + :type async: `bool` """ if isinstance(client, str): client = self.client_apps[client] @@ -762,7 +833,10 @@ class StoryBoard(_SBEntity): callback = [callback] process = client.process(duration, node, proc_id) self.process_dict[process.id] = process - process.run() + if async: + process.run_async() + else: + process.run() action = functools.partial(self.kill_process, process.id) term_ev = Event(action, ev_time=(self.cur_time + duration)) self.add_event(term_ev) @@ -910,7 +984,8 @@ class StoryBoard(_SBEntity): r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"' logger.debug("Writing utility startup script on nodes.") for node in self.node_map.values(): - node.execute_command( + _execute_command( + node, "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) try: @@ -1192,7 +1267,7 @@ class StoryBoard(_SBEntity): str(uuid.uuid4())[0:4] + ".pcap" ) - tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" \ + tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" % (ipcp.ifname, pcap_file))\ .process(end-start, node, 'tcpdump_proc') -- cgit v1.2.3