aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rumba/storyboard.py97
1 files changed, 55 insertions, 42 deletions
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index b6e07d6..200b302 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -274,7 +274,7 @@ class StoryBoard:
DEFAULT_INTERVAL = 2.5 # in seconds (may be a float)
- def __init__(self, duration, experiment=None, servers=None):
+ def __init__(self, experiment, duration, servers=None):
self.experiment = experiment
self.duration = duration
self.servers = list()
@@ -282,8 +282,10 @@ class StoryBoard:
servers = list()
for s in servers:
self._validate_and_add_server(s)
+ self.client_nodes = [c for c in experiment.nodes if c.client]
self.active_clients = []
self.start_time = None
+ self.commands_list = {}
def _validate_and_add_server(self, s):
if self.experiment is None:
@@ -318,54 +320,65 @@ class StoryBoard:
def del_server(self, server):
self.servers.remove(server)
+ def run_command(self, t, node, command):
+ """
+ Schedule the given command to be run at t seconds from the start.
+ The commands are run in no particular order, so take care
+
+ :param t: (float) seconds to wait before running the command
+ :param node: (Node) the node on which the command should be run
+ :param command: (str or list[str]) the command(s) to be run,
+ """
+ if isinstance(command, str):
+ self.commands_list.setdefault(t, []).append((node, command))
+ else: # Hope it's an Iterable[str]. Otherwise, errors will happen.
+ for cmd in command:
+ self.commands_list.setdefault(t, []).append((node, cmd))
+
+ def periodic_check(self):
+ # Spawn new clients
+ for server in self.servers:
+ clients = server.get_new_clients(self.DEFAULT_INTERVAL)
+ for new_client in clients:
+ client_node = random.choice(self.client_nodes)
+ new_client.run(client_node)
+ self.active_clients.append(new_client)
+ surviving = []
+
+ # Kill expired clients
+ for x in self.active_clients:
+ if x.kill_check(): #
+ surviving.append(x)
+ self.active_clients = surviving
+
+ # Do run_command instructions
+ unexpired_commands = {}
+ for t in self.commands_list:
+ if time.time() - self.start_time > t:
+ for node, command in self.commands_list[t]:
+ node.execute_command(command)
+ else:
+ unexpired_commands[t] = self.commands_list[t]
+ self.commands_list = unexpired_commands
+
def start(self):
+ logger.info('Starting storyboard execution')
self.start_time = time.time()
- script = r'logname="$1"; shift; nohup "${@}" ' \
- r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"'
- logger.debug("Writing utility startup script on client nodes.")
- for server in self.servers:
- for client in server.clients:
- for node in client.nodes:
- node.execute_command(
- "echo '%s' > startup.sh && chmod a+x startup.sh"
- % (script,)
- )
+ script = r'nohup "$@" > /dev/null & echo "$!"'
+ for node in self.client_nodes:
+ logger.debug("Writing utility startup script on client nodes.")
+ node.execute_command(
+ "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,)
+ )
try:
for server in self.servers:
server.run()
while time.time() - self.start_time < self.duration:
- for server in self.servers:
- clients = server.get_new_clients(self.DEFAULT_INTERVAL)
- for new_client in clients: # type: ClientProcess
- new_client.duration = min(
- new_client.duration,
- self.duration - (time.time() - self.start_time)
- )
- # Make sure the duration of the client does not
- # go beyond the storyboard lifetime
- if new_client.duration < server.min_duration:
- continue
- # Do not start clients that would not run for
- # at least the minimum duration
- # (due to sb constraints)
- new_client.run()
- self.active_clients.append(new_client)
- surviving = []
- for x in self.active_clients:
- if x.check():
- surviving.append(x)
- self.active_clients = surviving
+ self.periodic_check()
time.sleep(self.DEFAULT_INTERVAL)
- time.sleep(5)
- # Do a check that is supposed to find all remaining clients
- # as expired
- surviving = []
- for x in self.active_clients:
- if x.check():
- surviving.append(x)
- self.active_clients = surviving
- if surviving: # implied: is not empty
- logger.warning('Some clients could not be killed gracefully.')
+ self.periodic_check() # Do things that were scheduled
+ # in the last "INTERVAL" seconds
+ # of the StoryBoard
finally: # Kill everything. No more mercy.
for client in self.active_clients:
client.stop()