diff options
-rw-r--r-- | rumba/model.py | 60 |
1 files changed, 48 insertions, 12 deletions
diff --git a/rumba/model.py b/rumba/model.py index 7ec4fd9..4e2591a 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -897,7 +897,7 @@ class ClientProcess(Client): logger.warn('Could not kill client %s on node %s.', self.ap, self.node.name) - def check(self): + def kill_check(self): """Check if the process should keep running, stop it if not, and return true if and only if it is still running.""" now = time.time() @@ -1018,6 +1018,7 @@ class StoryBoard: self.client_nodes = [c for c in experiment.nodes if c.client] self.active_clients = [] self.start_time = None + self.commands_list = {} def add_server(self, server): self.servers.append(server) @@ -1025,7 +1026,49 @@ class StoryBoard: def del_server(self, server): self.servers.remove(server) + def run_command(self, t, node, command): + """ + Schedule the given command to be run at t seconds from the start. + The commands are run in no particular order, so take care + + :param t: (float) seconds to wait before running the command + :param node: (Node) the node on which the command should be run + :param command: (str or list[str]) the command(s) to be run, + """ + if isinstance(command, str): + self.commands_list.setdefault(t, []).append((node, command)) + else: # Hope it's an Iterable[str]. Otherwise, errors will happen. + for cmd in command: + self.commands_list.setdefault(t, []).append((node, cmd)) + + def periodic_check(self): + # Spawn new clients + for server in self.servers: + clients = server.get_new_clients(self.DEFAULT_INTERVAL) + for new_client in clients: + client_node = random.choice(self.client_nodes) + new_client.run(client_node) + self.active_clients.append(new_client) + surviving = [] + + # Kill expired clients + for x in self.active_clients: + if x.kill_check(): # + surviving.append(x) + self.active_clients = surviving + + # Do run_command instructions + unexpired_commands = {} + for t in self.commands_list: + if time.time() - self.start_time > t: + for node, command in self.commands_list[t]: + node.execute_command(command) + else: + unexpired_commands[t] = self.commands_list[t] + self.commands_list = unexpired_commands + def start(self): + logger.info('Starting storyboard execution') self.start_time = time.time() script = r'nohup "$@" > /dev/null & echo "$!"' for node in self.client_nodes: @@ -1037,18 +1080,11 @@ class StoryBoard: for server in self.servers: server.run() while time.time() - self.start_time < self.duration: - for server in self.servers: - clients = server.get_new_clients(self.DEFAULT_INTERVAL) - for new_client in clients: - client_node = random.choice(self.client_nodes) - new_client.run(client_node) - self.active_clients.append(new_client) - surviving = [] - for x in self.active_clients: - if x.check(): - surviving.append(x) - self.active_clients = surviving + self.periodic_check() time.sleep(self.DEFAULT_INTERVAL) + self.periodic_check() # Do things that were scheduled + # in the last "INTERVAL" seconds + # of the StoryBoard finally: for client in self.active_clients: client.stop() |