aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2018-06-04 17:35:08 +0200
committerMarco Capitani <m.capitani@nextworks.it>2018-06-04 17:40:36 +0200
commitd39179eed89bcca5ed011df9a571dfebff109b56 (patch)
tree4feb2c87d54907cb9a4e5bc75eaa19ee13e608e5
parent7636de071e69948da2b6bbbc3dc6055d3cac04cc (diff)
downloadrumba-d39179eed89bcca5ed011df9a571dfebff109b56.tar.gz
rumba-d39179eed89bcca5ed011df9a571dfebff109b56.zip
storyboard: add async command execution
The standard command execution function blocks until the command is complete. Even if the command is run in background on the node, this implies that the storyboard has to wait for the SSH handshake to complete and the responses to come back. With the new method, the storyboard will go on with the execution without waiting (callbacks can be registered for reacting).
-rw-r--r--rumba/storyboard.py137
1 files changed, 106 insertions, 31 deletions
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index c652073..56be7e0 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -31,6 +31,8 @@
#
import functools
import math
+from multiprocessing import cpu_count
+import multiprocessing.dummy as multiprocessing
import os
import random
import time
@@ -61,6 +63,33 @@ except ImportError:
# PROBLEM! These logs will almost never be printed...
# But we might not care
+POOL = multiprocessing.Pool(cpu_count() * 6)
+
+
+def _execute_command(node,
+ cmd,
+ callback=None,
+ e_callback=None,
+ as_root=False):
+ if e_callback is None:
+ def e_callback(e):
+ logger.warning('Could not execute command "%s" on node "%s". '
+ 'Error: %s.',
+ cmd,
+ node.name,
+ e)
+ if callback is None:
+ def callback(x):
+ pass
+
+ POOL.apply_async(
+ node.execute_command,
+ args=(cmd,),
+ kwds={'as_root': as_root},
+ callback=callback,
+ error_callback=e_callback
+ )
+
class _SBEntity(object):
@@ -194,26 +223,42 @@ class ClientProcess(_SBEntity):
self.start_time = None
self.running = False
self.node = node
- self.pid = None
+ self._pid = multiprocessing.Value('i', -1)
self.shutdown = shutdown
self.as_root = as_root
- def run(self):
- """Starts this process"""
+ @property
+ def pid(self):
+ if self._pid.value == -1:
+ raise ValueError("Process %s is not running."
+ % (self.id,))
+ else:
+ return self._pid.value
+
+ @pid.setter
+ def pid(self, value):
+ self._pid.value = value
+
+ def _make_run_cmd(self):
if self.node is None:
raise Exception('No node specified for client %s' % (self.ap_id,))
self.start_time = time.time()
+ startup = self.startup.replace("<duration>", str(self.duration))
+ return "./startup.sh %s %s" % (
+ self.id,
+ startup,
+ )
+
+ def run(self):
+ """Starts this process"""
+
logger.debug(
'Starting client app %s on node %s with duration %s.',
self.ap_id, self.node.name, self.duration
)
- startup = self.startup.replace("<duration>", str(self.duration))
- cmd = "./startup.sh %s %s" % (
- self.id,
- startup,
- )
+ cmd = self._make_run_cmd()
self.running = True
try:
@@ -224,6 +269,31 @@ class ClientProcess(_SBEntity):
logger.debug('Client app %s on node %s got pid %s.',
self.ap_id, self.node.name, self.pid)
+ def run_async(self):
+ """Starts this process asynchronously"""
+
+ def callback(pid):
+ self.pid = pid
+ logger.debug('Client app %s on node %s got pid %s.',
+ self.ap_id, self.node.name, self.pid)
+
+ def e_callback(e):
+ logger.warning('Could not start client %s on node %s. '
+ 'Error: %s.',
+ self.ap_id,
+ self.node.name,
+ e)
+
+ logger.debug(
+ 'Starting client app %s on node %s with duration %s (async).',
+ self.ap_id, self.node.name, self.duration
+ )
+
+ cmd = self._make_run_cmd()
+
+ self.running = True
+ _execute_command(self.node, cmd, callback, e_callback, self.as_root)
+
def stop(self):
"""Stops this process"""
if self.shutdown != "":
@@ -231,17 +301,15 @@ class ClientProcess(_SBEntity):
'Killing client %s on node %s.',
self.ap_id, self.node.name
)
- try:
- kill_cmd = self.shutdown.replace('<pid>', str(self.pid))
- self.node.execute_command(kill_cmd)
- except ssh_support.SSHException:
- logger.warning('Could not kill client %s on node %s.',
- self.ap_id, self.node.name)
- else:
- logger.debug(
- 'Client %s on node %s has terminated.',
- self.ap_id, self.node.name
- )
+
+ def callback():
+ logger.debug(
+ 'Client %s on node %s has terminated.',
+ self.ap_id, self.node.name
+ )
+
+ kill_cmd = self.shutdown.replace('<pid>', str(self.pid))
+ _execute_command(self.node, kill_cmd, callback, as_root=self.as_root)
class Server(_SBEntity):
@@ -413,16 +481,16 @@ class Server(_SBEntity):
run_cmd = self.ap + (
(" " + self.options) if self.options is not None else ""
)
- cmd_2 = "./startup.sh %s %s" % ('server_' + self.id, run_cmd)
+ cmd = "./startup.sh %s %s" % ('server_' + self.id, run_cmd)
logger.debug(
'Starting server %s on node %s.',
self.id, node.name
)
- try:
- self.pids[node] = node.execute_command(cmd_2, as_root=self.as_root)
- except ssh_support.SSHException:
- logger.warning('Could not start server %s on node %s.',
- self.id, node.name)
+
+ def callback(pid):
+ self.pids[node] = pid
+
+ _execute_command(node, cmd, callback, as_root=self.as_root)
def stop(self):
"""Stops this server"""
@@ -432,7 +500,7 @@ class Server(_SBEntity):
self.id, node.name
)
try:
- node.execute_command("kill %s" % pid)
+ _execute_command(node, "kill %s" % (pid,), as_root=self.as_root)
except ssh_support.SSHException:
logger.warning('Could not kill server %s on node %s.',
self.id, node.name)
@@ -449,7 +517,6 @@ class StoryBoard(_SBEntity):
"""Class representing the storyboard of an experiment"""
SCRIPT_RESOLUTION = 0.1
- EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float)
def get_e_id(self):
return 'storyboard'
@@ -728,7 +795,8 @@ class StoryBoard(_SBEntity):
duration,
node=None,
proc_id=None,
- callback=None):
+ callback=None,
+ async=True):
"""
Runs the specified client app with the specified parameters.
@@ -746,6 +814,9 @@ class StoryBoard(_SBEntity):
:param callback: callable or list thereof to be run
after client termination
:type callback: `callable` or `list` of `callable`
+ :param async: if true, the SSH communication will be dealt
+ with asynchronously
+ :type async: `bool`
"""
if isinstance(client, str):
client = self.client_apps[client]
@@ -762,7 +833,10 @@ class StoryBoard(_SBEntity):
callback = [callback]
process = client.process(duration, node, proc_id)
self.process_dict[process.id] = process
- process.run()
+ if async:
+ process.run_async()
+ else:
+ process.run()
action = functools.partial(self.kill_process, process.id)
term_ev = Event(action, ev_time=(self.cur_time + duration))
self.add_event(term_ev)
@@ -910,7 +984,8 @@ class StoryBoard(_SBEntity):
r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"'
logger.debug("Writing utility startup script on nodes.")
for node in self.node_map.values():
- node.execute_command(
+ _execute_command(
+ node,
"echo '%s' > startup.sh && chmod a+x startup.sh" % (script,)
)
try:
@@ -1192,7 +1267,7 @@ class StoryBoard(_SBEntity):
str(uuid.uuid4())[0:4] + ".pcap"
)
- tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" \
+ tcpd_client = Client(ap="tcpdump", options="-i %s -w %s"
% (ipcp.ifname, pcap_file))\
.process(end-start, node, 'tcpdump_proc')