aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rumba/storyboard.py137
1 files changed, 106 insertions, 31 deletions
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index c652073..56be7e0 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -31,6 +31,8 @@
#
import functools
import math
+from multiprocessing import cpu_count
+import multiprocessing.dummy as multiprocessing
import os
import random
import time
@@ -61,6 +63,33 @@ except ImportError:
# PROBLEM! These logs will almost never be printed...
# But we might not care
+POOL = multiprocessing.Pool(cpu_count() * 6)
+
+
+def _execute_command(node,
+ cmd,
+ callback=None,
+ e_callback=None,
+ as_root=False):
+ if e_callback is None:
+ def e_callback(e):
+ logger.warning('Could not execute command "%s" on node "%s". '
+ 'Error: %s.',
+ cmd,
+ node.name,
+ e)
+ if callback is None:
+ def callback(x):
+ pass
+
+ POOL.apply_async(
+ node.execute_command,
+ args=(cmd,),
+ kwds={'as_root': as_root},
+ callback=callback,
+ error_callback=e_callback
+ )
+
class _SBEntity(object):
@@ -194,26 +223,42 @@ class ClientProcess(_SBEntity):
self.start_time = None
self.running = False
self.node = node
- self.pid = None
+ self._pid = multiprocessing.Value('i', -1)
self.shutdown = shutdown
self.as_root = as_root
- def run(self):
- """Starts this process"""
+ @property
+ def pid(self):
+ if self._pid.value == -1:
+ raise ValueError("Process %s is not running."
+ % (self.id,))
+ else:
+ return self._pid.value
+
+ @pid.setter
+ def pid(self, value):
+ self._pid.value = value
+
+ def _make_run_cmd(self):
if self.node is None:
raise Exception('No node specified for client %s' % (self.ap_id,))
self.start_time = time.time()
+ startup = self.startup.replace("<duration>", str(self.duration))
+ return "./startup.sh %s %s" % (
+ self.id,
+ startup,
+ )
+
+ def run(self):
+ """Starts this process"""
+
logger.debug(
'Starting client app %s on node %s with duration %s.',
self.ap_id, self.node.name, self.duration
)
- startup = self.startup.replace("<duration>", str(self.duration))
- cmd = "./startup.sh %s %s" % (
- self.id,
- startup,
- )
+ cmd = self._make_run_cmd()
self.running = True
try:
@@ -224,6 +269,31 @@ class ClientProcess(_SBEntity):
logger.debug('Client app %s on node %s got pid %s.',
self.ap_id, self.node.name, self.pid)
+ def run_async(self):
+ """Starts this process asynchronously"""
+
+ def callback(pid):
+ self.pid = pid
+ logger.debug('Client app %s on node %s got pid %s.',
+ self.ap_id, self.node.name, self.pid)
+
+ def e_callback(e):
+ logger.warning('Could not start client %s on node %s. '
+ 'Error: %s.',
+ self.ap_id,
+ self.node.name,
+ e)
+
+ logger.debug(
+ 'Starting client app %s on node %s with duration %s (async).',
+ self.ap_id, self.node.name, self.duration
+ )
+
+ cmd = self._make_run_cmd()
+
+ self.running = True
+ _execute_command(self.node, cmd, callback, e_callback, self.as_root)
+
def stop(self):
"""Stops this process"""
if self.shutdown != "":
@@ -231,17 +301,15 @@ class ClientProcess(_SBEntity):
'Killing client %s on node %s.',
self.ap_id, self.node.name
)
- try:
- kill_cmd = self.shutdown.replace('<pid>', str(self.pid))
- self.node.execute_command(kill_cmd)
- except ssh_support.SSHException:
- logger.warning('Could not kill client %s on node %s.',
- self.ap_id, self.node.name)
- else:
- logger.debug(
- 'Client %s on node %s has terminated.',
- self.ap_id, self.node.name
- )
+
+ def callback():
+ logger.debug(
+ 'Client %s on node %s has terminated.',
+ self.ap_id, self.node.name
+ )
+
+ kill_cmd = self.shutdown.replace('<pid>', str(self.pid))
+ _execute_command(self.node, kill_cmd, callback, as_root=self.as_root)
class Server(_SBEntity):
@@ -413,16 +481,16 @@ class Server(_SBEntity):
run_cmd = self.ap + (
(" " + self.options) if self.options is not None else ""
)
- cmd_2 = "./startup.sh %s %s" % ('server_' + self.id, run_cmd)
+ cmd = "./startup.sh %s %s" % ('server_' + self.id, run_cmd)
logger.debug(
'Starting server %s on node %s.',
self.id, node.name
)
- try:
- self.pids[node] = node.execute_command(cmd_2, as_root=self.as_root)
- except ssh_support.SSHException:
- logger.warning('Could not start server %s on node %s.',
- self.id, node.name)
+
+ def callback(pid):
+ self.pids[node] = pid
+
+ _execute_command(node, cmd, callback, as_root=self.as_root)
def stop(self):
"""Stops this server"""
@@ -432,7 +500,7 @@ class Server(_SBEntity):
self.id, node.name
)
try:
- node.execute_command("kill %s" % pid)
+ _execute_command(node, "kill %s" % (pid,), as_root=self.as_root)
except ssh_support.SSHException:
logger.warning('Could not kill server %s on node %s.',
self.id, node.name)
@@ -449,7 +517,6 @@ class StoryBoard(_SBEntity):
"""Class representing the storyboard of an experiment"""
SCRIPT_RESOLUTION = 0.1
- EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float)
def get_e_id(self):
return 'storyboard'
@@ -728,7 +795,8 @@ class StoryBoard(_SBEntity):
duration,
node=None,
proc_id=None,
- callback=None):
+ callback=None,
+ async=True):
"""
Runs the specified client app with the specified parameters.
@@ -746,6 +814,9 @@ class StoryBoard(_SBEntity):
:param callback: callable or list thereof to be run
after client termination
:type callback: `callable` or `list` of `callable`
+ :param async: if true, the SSH communication will be dealt
+ with asynchronously
+ :type async: `bool`
"""
if isinstance(client, str):
client = self.client_apps[client]
@@ -762,7 +833,10 @@ class StoryBoard(_SBEntity):
callback = [callback]
process = client.process(duration, node, proc_id)
self.process_dict[process.id] = process
- process.run()
+ if async:
+ process.run_async()
+ else:
+ process.run()
action = functools.partial(self.kill_process, process.id)
term_ev = Event(action, ev_time=(self.cur_time + duration))
self.add_event(term_ev)
@@ -910,7 +984,8 @@ class StoryBoard(_SBEntity):
r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"'
logger.debug("Writing utility startup script on nodes.")
for node in self.node_map.values():
- node.execute_command(
+ _execute_command(
+ node,
"echo '%s' > startup.sh && chmod a+x startup.sh" % (script,)
)
try:
@@ -1192,7 +1267,7 @@ class StoryBoard(_SBEntity):
str(uuid.uuid4())[0:4] + ".pcap"
)
- tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" \
+ tcpd_client = Client(ap="tcpdump", options="-i %s -w %s"
% (ipcp.ifname, pcap_file))\
.process(end-start, node, 'tcpdump_proc')