diff options
author | Marco Capitani <m.capitani@nextworks.it> | 2018-01-08 17:46:51 +0100 |
---|---|---|
committer | Marco Capitani <m.capitani@nextworks.it> | 2018-01-08 17:46:51 +0100 |
commit | bc28ae091621eee4aeee5452bff5ac09dc2cc2f4 (patch) | |
tree | d90a38319a9beea94c7617548071c19f996a0962 | |
parent | d6159432e8fc333a2466b8836ba34db55ed3bb82 (diff) | |
download | rumba-bc28ae091621eee4aeee5452bff5ac09dc2cc2f4.tar.gz rumba-bc28ae091621eee4aeee5452bff5ac09dc2cc2f4.zip |
storyboard: recover run_command method
-rw-r--r-- | rumba/storyboard.py | 97 |
1 files changed, 55 insertions, 42 deletions
diff --git a/rumba/storyboard.py b/rumba/storyboard.py index b6e07d6..200b302 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -274,7 +274,7 @@ class StoryBoard: DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) - def __init__(self, duration, experiment=None, servers=None): + def __init__(self, experiment, duration, servers=None): self.experiment = experiment self.duration = duration self.servers = list() @@ -282,8 +282,10 @@ class StoryBoard: servers = list() for s in servers: self._validate_and_add_server(s) + self.client_nodes = [c for c in experiment.nodes if c.client] self.active_clients = [] self.start_time = None + self.commands_list = {} def _validate_and_add_server(self, s): if self.experiment is None: @@ -318,54 +320,65 @@ class StoryBoard: def del_server(self, server): self.servers.remove(server) + def run_command(self, t, node, command): + """ + Schedule the given command to be run at t seconds from the start. + The commands are run in no particular order, so take care + + :param t: (float) seconds to wait before running the command + :param node: (Node) the node on which the command should be run + :param command: (str or list[str]) the command(s) to be run, + """ + if isinstance(command, str): + self.commands_list.setdefault(t, []).append((node, command)) + else: # Hope it's an Iterable[str]. Otherwise, errors will happen. + for cmd in command: + self.commands_list.setdefault(t, []).append((node, cmd)) + + def periodic_check(self): + # Spawn new clients + for server in self.servers: + clients = server.get_new_clients(self.DEFAULT_INTERVAL) + for new_client in clients: + client_node = random.choice(self.client_nodes) + new_client.run(client_node) + self.active_clients.append(new_client) + surviving = [] + + # Kill expired clients + for x in self.active_clients: + if x.kill_check(): # + surviving.append(x) + self.active_clients = surviving + + # Do run_command instructions + unexpired_commands = {} + for t in self.commands_list: + if time.time() - self.start_time > t: + for node, command in self.commands_list[t]: + node.execute_command(command) + else: + unexpired_commands[t] = self.commands_list[t] + self.commands_list = unexpired_commands + def start(self): + logger.info('Starting storyboard execution') self.start_time = time.time() - script = r'logname="$1"; shift; nohup "${@}" ' \ - r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"' - logger.debug("Writing utility startup script on client nodes.") - for server in self.servers: - for client in server.clients: - for node in client.nodes: - node.execute_command( - "echo '%s' > startup.sh && chmod a+x startup.sh" - % (script,) - ) + script = r'nohup "$@" > /dev/null & echo "$!"' + for node in self.client_nodes: + logger.debug("Writing utility startup script on client nodes.") + node.execute_command( + "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) + ) try: for server in self.servers: server.run() while time.time() - self.start_time < self.duration: - for server in self.servers: - clients = server.get_new_clients(self.DEFAULT_INTERVAL) - for new_client in clients: # type: ClientProcess - new_client.duration = min( - new_client.duration, - self.duration - (time.time() - self.start_time) - ) - # Make sure the duration of the client does not - # go beyond the storyboard lifetime - if new_client.duration < server.min_duration: - continue - # Do not start clients that would not run for - # at least the minimum duration - # (due to sb constraints) - new_client.run() - self.active_clients.append(new_client) - surviving = [] - for x in self.active_clients: - if x.check(): - surviving.append(x) - self.active_clients = surviving + self.periodic_check() time.sleep(self.DEFAULT_INTERVAL) - time.sleep(5) - # Do a check that is supposed to find all remaining clients - # as expired - surviving = [] - for x in self.active_clients: - if x.check(): - surviving.append(x) - self.active_clients = surviving - if surviving: # implied: is not empty - logger.warning('Some clients could not be killed gracefully.') + self.periodic_check() # Do things that were scheduled + # in the last "INTERVAL" seconds + # of the StoryBoard finally: # Kill everything. No more mercy. for client in self.active_clients: client.stop() |