aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rumba/model.py60
1 files changed, 48 insertions, 12 deletions
diff --git a/rumba/model.py b/rumba/model.py
index 7ec4fd9..4e2591a 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -897,7 +897,7 @@ class ClientProcess(Client):
logger.warn('Could not kill client %s on node %s.',
self.ap, self.node.name)
- def check(self):
+ def kill_check(self):
"""Check if the process should keep running, stop it if not,
and return true if and only if it is still running."""
now = time.time()
@@ -1018,6 +1018,7 @@ class StoryBoard:
self.client_nodes = [c for c in experiment.nodes if c.client]
self.active_clients = []
self.start_time = None
+ self.commands_list = {}
def add_server(self, server):
self.servers.append(server)
@@ -1025,7 +1026,49 @@ class StoryBoard:
def del_server(self, server):
self.servers.remove(server)
+ def run_command(self, t, node, command):
+ """
+ Schedule the given command to be run at t seconds from the start.
+ The commands are run in no particular order, so take care
+
+ :param t: (float) seconds to wait before running the command
+ :param node: (Node) the node on which the command should be run
+ :param command: (str or list[str]) the command(s) to be run,
+ """
+ if isinstance(command, str):
+ self.commands_list.setdefault(t, []).append((node, command))
+ else: # Hope it's an Iterable[str]. Otherwise, errors will happen.
+ for cmd in command:
+ self.commands_list.setdefault(t, []).append((node, cmd))
+
+ def periodic_check(self):
+ # Spawn new clients
+ for server in self.servers:
+ clients = server.get_new_clients(self.DEFAULT_INTERVAL)
+ for new_client in clients:
+ client_node = random.choice(self.client_nodes)
+ new_client.run(client_node)
+ self.active_clients.append(new_client)
+ surviving = []
+
+ # Kill expired clients
+ for x in self.active_clients:
+ if x.kill_check(): #
+ surviving.append(x)
+ self.active_clients = surviving
+
+ # Do run_command instructions
+ unexpired_commands = {}
+ for t in self.commands_list:
+ if time.time() - self.start_time > t:
+ for node, command in self.commands_list[t]:
+ node.execute_command(command)
+ else:
+ unexpired_commands[t] = self.commands_list[t]
+ self.commands_list = unexpired_commands
+
def start(self):
+ logger.info('Starting storyboard execution')
self.start_time = time.time()
script = r'nohup "$@" > /dev/null & echo "$!"'
for node in self.client_nodes:
@@ -1037,18 +1080,11 @@ class StoryBoard:
for server in self.servers:
server.run()
while time.time() - self.start_time < self.duration:
- for server in self.servers:
- clients = server.get_new_clients(self.DEFAULT_INTERVAL)
- for new_client in clients:
- client_node = random.choice(self.client_nodes)
- new_client.run(client_node)
- self.active_clients.append(new_client)
- surviving = []
- for x in self.active_clients:
- if x.check():
- surviving.append(x)
- self.active_clients = surviving
+ self.periodic_check()
time.sleep(self.DEFAULT_INTERVAL)
+ self.periodic_check() # Do things that were scheduled
+ # in the last "INTERVAL" seconds
+ # of the StoryBoard
finally:
for client in self.active_clients:
client.stop()