diff options
| author | Marco Capitani <m.capitani@nextworks.it> | 2018-01-08 17:46:51 +0100 | 
|---|---|---|
| committer | Marco Capitani <m.capitani@nextworks.it> | 2018-01-08 17:46:51 +0100 | 
| commit | bc28ae091621eee4aeee5452bff5ac09dc2cc2f4 (patch) | |
| tree | d90a38319a9beea94c7617548071c19f996a0962 | |
| parent | d6159432e8fc333a2466b8836ba34db55ed3bb82 (diff) | |
| download | rumba-bc28ae091621eee4aeee5452bff5ac09dc2cc2f4.tar.gz rumba-bc28ae091621eee4aeee5452bff5ac09dc2cc2f4.zip  | |
storyboard: recover run_command method
| -rw-r--r-- | rumba/storyboard.py | 97 | 
1 files changed, 55 insertions, 42 deletions
diff --git a/rumba/storyboard.py b/rumba/storyboard.py index b6e07d6..200b302 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -274,7 +274,7 @@ class StoryBoard:      DEFAULT_INTERVAL = 2.5  # in seconds (may be a float) -    def __init__(self, duration, experiment=None, servers=None): +    def __init__(self, experiment, duration, servers=None):          self.experiment = experiment          self.duration = duration          self.servers = list() @@ -282,8 +282,10 @@ class StoryBoard:              servers = list()          for s in servers:              self._validate_and_add_server(s) +        self.client_nodes = [c for c in experiment.nodes if c.client]          self.active_clients = []          self.start_time = None +        self.commands_list = {}      def _validate_and_add_server(self, s):          if self.experiment is None: @@ -318,54 +320,65 @@ class StoryBoard:      def del_server(self, server):          self.servers.remove(server) +    def run_command(self, t, node, command): +        """ +        Schedule the given command to be run at t seconds from the start. +        The commands are run in no particular order, so take care + +        :param t: (float) seconds to wait before running the command +        :param node: (Node) the node on which the command should be run +        :param command: (str or list[str]) the command(s) to be run, +        """ +        if isinstance(command, str): +            self.commands_list.setdefault(t, []).append((node, command)) +        else:  # Hope it's an Iterable[str]. Otherwise, errors will happen. +            for cmd in command: +                self.commands_list.setdefault(t, []).append((node, cmd)) + +    def periodic_check(self): +        # Spawn new clients +        for server in self.servers: +            clients = server.get_new_clients(self.DEFAULT_INTERVAL) +            for new_client in clients: +                client_node = random.choice(self.client_nodes) +                new_client.run(client_node) +                self.active_clients.append(new_client) +        surviving = [] + +        # Kill expired clients +        for x in self.active_clients: +            if x.kill_check():  # +                surviving.append(x) +        self.active_clients = surviving + +        # Do run_command instructions +        unexpired_commands = {} +        for t in self.commands_list: +            if time.time() - self.start_time > t: +                for node, command in self.commands_list[t]: +                    node.execute_command(command) +            else: +                unexpired_commands[t] = self.commands_list[t] +        self.commands_list = unexpired_commands +      def start(self): +        logger.info('Starting storyboard execution')          self.start_time = time.time() -        script = r'logname="$1"; shift; nohup "${@}" ' \ -                 r'> /tmp/${logname}.rumba.log 2>&1  & echo "$!"' -        logger.debug("Writing utility startup script on client nodes.") -        for server in self.servers: -            for client in server.clients: -                for node in client.nodes: -                    node.execute_command( -                        "echo '%s' > startup.sh && chmod a+x startup.sh" -                        % (script,) -                    ) +        script = r'nohup "$@" > /dev/null & echo "$!"' +        for node in self.client_nodes: +            logger.debug("Writing utility startup script on client nodes.") +            node.execute_command( +                "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) +            )          try:              for server in self.servers:                  server.run()              while time.time() - self.start_time < self.duration: -                for server in self.servers: -                    clients = server.get_new_clients(self.DEFAULT_INTERVAL) -                    for new_client in clients:  # type: ClientProcess -                        new_client.duration = min( -                            new_client.duration, -                            self.duration - (time.time() - self.start_time) -                        ) -                        # Make sure the duration of the client does not -                        # go beyond the storyboard lifetime -                        if new_client.duration < server.min_duration: -                            continue -                            # Do not start clients that would not run for -                            # at least the minimum duration -                            # (due to sb constraints) -                        new_client.run() -                        self.active_clients.append(new_client) -                surviving = [] -                for x in self.active_clients: -                    if x.check(): -                        surviving.append(x) -                self.active_clients = surviving +                self.periodic_check()                  time.sleep(self.DEFAULT_INTERVAL) -            time.sleep(5) -            # Do a check that is supposed to find all remaining clients -            # as expired -            surviving = [] -            for x in self.active_clients: -                if x.check(): -                    surviving.append(x) -            self.active_clients = surviving -            if surviving:  # implied: is not empty -                logger.warning('Some clients could not be killed gracefully.') +            self.periodic_check()  # Do things that were scheduled +                                   # in the last "INTERVAL" seconds +                                   # of the StoryBoard          finally:  # Kill everything. No more mercy.              for client in self.active_clients:                  client.stop()  | 
