From 658a41da85783d0ea06b91e5c72f755144b2e449 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Wed, 10 Jan 2018 12:41:25 +0100 Subject: storyboard: re-add run_command method Also: logging fix in ssh_support, Enabling changes: storyboard: refactor 'start' method, it was getting big some more minor storyboard refactoring for decoupling update examples to new syntax --- examples/converged-operator-network.py | 2 +- examples/example.py | 2 +- examples/two-layers.py | 2 +- rumba/log.py | 15 +++++ rumba/ssh_support.py | 4 +- rumba/storyboard.py | 120 ++++++++++++++++++++++----------- rumba/utils.py | 29 ++++---- 7 files changed, 118 insertions(+), 56 deletions(-) diff --git a/examples/converged-operator-network.py b/examples/converged-operator-network.py index 57609fc..d0da907 100755 --- a/examples/converged-operator-network.py +++ b/examples/converged-operator-network.py @@ -198,5 +198,5 @@ with ExperimentManager(exp): c1 = Client("rinaperf", options ="-i 10000 -s 1000 -c 0 -d overlay") s1 = Server("rinaperf", arrival_rate=2, mean_duration=5, options = "-l -d overlay", nodes = [c1n1], clients = [c1]) - sb = StoryBoard(exp, duration=3600, servers = [s1]) + sb = StoryBoard(experiment=exp, duration=3600, servers = [s1]) sb.start() diff --git a/examples/example.py b/examples/example.py index a26ab3b..eee966b 100755 --- a/examples/example.py +++ b/examples/example.py @@ -66,7 +66,7 @@ server = Server("rinaperf", options="-l", arrival_rate=0.5, # (This can be done anytime before storyboard.start()) storyboard.set_experiment(exp) -storyboard.add_server((server, a)) +storyboard.add_server_on_node(server, a) client1.add_node(b) client2.add_node(b) diff --git a/examples/two-layers.py b/examples/two-layers.py index d2e4007..c375088 100755 --- a/examples/two-layers.py +++ b/examples/two-layers.py @@ -58,7 +58,7 @@ print(exp) with ExperimentManager(exp): exp.swap_in() exp.bootstrap_prototype() - sb = StoryBoard(exp, duration=15, servers=[]) + sb = StoryBoard(experiment=exp, duration=15, servers=[]) sb.run_command(7.5, a, 'echo "7.5 secs in. We are at $(hostname)"') sb.run_command(12, b, 'echo "12 secs in. We are at $(hostname)"') sb.start() diff --git a/rumba/log.py b/rumba/log.py index eb6be03..893f5f7 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -42,6 +42,8 @@ loggers_set = set() mq = multiprocessing.Queue() +logging_listener = None + try: from logging.handlers import QueueHandler @@ -201,6 +203,8 @@ def setup(): formatter = RumbaFormatter() handler.setFormatter(formatter) listener = QueueListener(mq, handler) + global logging_listener + logging_listener = listener listener.start() @@ -271,3 +275,14 @@ def reset_logging_level(): logger.setLevel(logging.NOTSET) set_logging_level(logging.INFO) set_logging_level(logging.ERROR, '') + + +def flush_and_kill_logging(): + """ + Flushes all queued log messages and stops the logging facility. + Since the logging is done by a daemon thread, log entries might be lost + if execution is interrupted abruptly. Call this method to make sure + this does not happen. + """ + global logging_listener + logging_listener.stop() diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py index 895a4e1..1ffa655 100644 --- a/rumba/ssh_support.py +++ b/rumba/ssh_support.py @@ -329,11 +329,11 @@ def copy_files_from_testbed(testbed, ssh_config, paths, destination): for path in paths: file_name = os.path.basename(path) dest_file = destination + file_name - logger.debug("Copying %s to %s@%s:%s path %s" % ( - path, + logger.debug("Copying %s@%s:%s path %s to %s" % ( testbed.username, ssh_config.hostname, ssh_config.port, + path, dest_file)) sftp_client.get(path, dest_file) diff --git a/rumba/storyboard.py b/rumba/storyboard.py index 200b302..bbb1946 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -99,7 +99,7 @@ class Client(object): # class ClientProcess(object): def __init__(self, client_id, ap, startup, duration, - node=None, shutdown=""): + node, shutdown=""): self.id = client_id self.ap = ap self.startup = startup @@ -110,9 +110,7 @@ class ClientProcess(object): self.pid = None self.shutdown = shutdown - def run(self, node=None): - if node is not None: - self.node = node + def run(self): if self.node is None: raise Exception('No node specified for client %s' % (self.ap,)) self.start_time = time.time() @@ -154,7 +152,7 @@ class ClientProcess(object): self.ap, self.node.name ) - def check(self): + def kill_check(self): """Check if the process should keep running, stop it if not, and return true if and only if it is still running.""" now = time.time() @@ -274,7 +272,7 @@ class StoryBoard: DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) - def __init__(self, experiment, duration, servers=None): + def __init__(self, duration, experiment=None, servers=None): self.experiment = experiment self.duration = duration self.servers = list() @@ -282,29 +280,32 @@ class StoryBoard: servers = list() for s in servers: self._validate_and_add_server(s) - self.client_nodes = [c for c in experiment.nodes if c.client] + self.client_nodes = set() + self.server_nodes = set() self.active_clients = [] self.start_time = None self.commands_list = {} - def _validate_and_add_server(self, s): - if self.experiment is None: - raise ValueError("Cannot add a server before " - "setting the experiment.") - if hasattr(s, '__len__') and len(s) == 2: - server, node = s - if not isinstance(server, Server) \ - or not isinstance(node, model.Node): - raise TypeError('First element must be of "Server" type, ' - 'second must be of "Node" type.') - server.add_node(node) - self.servers.append(server) - elif type(s) == Server: + def _build_nodes_lists(self): + """Populates server_nodes and client_nodes lists""" + for server in self.servers: + for node in server.nodes: + self.server_nodes.add(node) + for client in server.clients: + for node in client.nodes: + self.client_nodes.add(node) + + def _validate_and_add_server(self, s, n=None): + if len(s.clients) == 0: + logger.warning("'%s' server has no registered clients.", s.ap) + if n is not None: + s.add_node(n) self.servers.append(s) else: - raise TypeError('Input servers should be either an object of ' - '"Server" type or a Server-Node couple.') - for node in self.servers[-1].nodes: + if len(s.nodes) == 0: + logger.warning("'%s' server has no registered nodes.", s.ap) + self.servers.append(s) + for node in s.nodes: if node not in self.experiment.nodes: raise ValueError('Cannot run server on node %s, ' 'not in experiment.' % (node.name,)) @@ -315,8 +316,28 @@ class StoryBoard: self.experiment = experiment def add_server(self, server): + """Register a server node to the sb.""" + if self.experiment is None: + raise ValueError("Cannot add a server before " + "setting the experiment.") + if not isinstance(server, Server): + raise TypeError('Argument must be of type Server') self._validate_and_add_server(server) + def add_server_on_node(self, server, node): + """ + Utility method to simultaneously add a server to a sb + and a node to the server. + """ + if self.experiment is None: + raise ValueError("Cannot add a server before " + "setting the experiment.") + if not isinstance(server, Server): + raise TypeError('First argument must be of type Server') + if not isinstance(node, model.Node): + raise TypeError('Second argument must be of type Server') + self._validate_and_add_server(server, node) + def del_server(self, server): self.servers.remove(server) @@ -329,6 +350,12 @@ class StoryBoard: :param node: (Node) the node on which the command should be run :param command: (str or list[str]) the command(s) to be run, """ + if self.experiment is None: + raise ValueError("Cannot add a server before " + "setting the experiment.") + if node not in self.experiment.nodes: + raise ValueError('Cannot run command on node %s, ' + 'not in experiment.' % (node.name,)) if isinstance(command, str): self.commands_list.setdefault(t, []).append((node, command)) else: # Hope it's an Iterable[str]. Otherwise, errors will happen. @@ -340,12 +367,23 @@ class StoryBoard: for server in self.servers: clients = server.get_new_clients(self.DEFAULT_INTERVAL) for new_client in clients: - client_node = random.choice(self.client_nodes) - new_client.run(client_node) + new_client.duration = min( + new_client.duration, + self.duration - (time.time() - self.start_time) + ) + # Make sure the duration of the client does not + # go beyond the storyboard lifetime + if new_client.duration < server.min_duration: + continue + # Do not start clients that would not run for + # at least the minimum duration + # (due to sb constraints) + new_client.run() self.active_clients.append(new_client) - surviving = [] # Kill expired clients + surviving = [] + for x in self.active_clients: if x.kill_check(): # surviving.append(x) @@ -354,7 +392,7 @@ class StoryBoard: # Do run_command instructions unexpired_commands = {} for t in self.commands_list: - if time.time() - self.start_time > t: + if (time.time() - self.start_time) > t: for node, command in self.commands_list[t]: node.execute_command(command) else: @@ -362,7 +400,17 @@ class StoryBoard: self.commands_list = unexpired_commands def start(self): + if self.experiment is None: + raise ValueError("Cannot add a server before " + "setting the experiment.") logger.info('Starting storyboard execution') + self._build_nodes_lists() + logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes]) + logger.debug('Client nodes are: %s.', [x.name for x in self.client_nodes]) + logger.debug('Command list is: %s.', {x: [(y.name, z) + for y, z in t] + for (x, t) + in self.commands_list.items()}) self.start_time = time.time() script = r'nohup "$@" > /dev/null & echo "$!"' for node in self.client_nodes: @@ -389,20 +437,14 @@ class StoryBoard: if not os.path.isdir(local_dir): raise Exception('"%s" is not a directory. Cannot fetch logs.' % local_dir) - server_nodes = set() - client_nodes = set() - for server in self.servers: - for node in server.nodes: - server_nodes.add(node) - for client in server.clients: - for node in client.nodes: - client_nodes.add(node) - for node in server_nodes: + for node in self.server_nodes: logs_list = node.execute_command('ls *_server.log') + logs_list = [x for x in logs_list.split('\n') if x != ''] logger.info('Log list is:\n%s', logs_list) - node.fetch_files(logs_list.split('\n'), local_dir) - for node in client_nodes: + node.fetch_files(logs_list, local_dir) + for node in self.client_nodes: logs_list = node.execute_command('ls /tmp/*.rumba.log ' '|| echo ""') + logs_list = [x for x in logs_list.split('\n') if x != ''] logger.info('Log list is:\n%s', logs_list) - node.fetch_files(logs_list.split('\n'), local_dir) + node.fetch_files(logs_list, local_dir) diff --git a/rumba/utils.py b/rumba/utils.py index 7417074..2a8c6b7 100644 --- a/rumba/utils.py +++ b/rumba/utils.py @@ -54,15 +54,20 @@ class ExperimentManager(object): pass def __exit__(self, exc_type, exc_val, exc_tb): - if self.do_swap_out == self.PROMPT: - logger.info('Press ENTER to start swap out.') - input('') - if self.do_swap_out == self.PROMPT \ - or self.do_swap_out == self.AUTO: - self.experiment.swap_out() - if exc_val is not None: - logger.error('Something went wrong. Got %s: %s', - type(exc_val).__name__, str(exc_val)) - logger.debug('Exception details:', exc_info=exc_val) - time.sleep(0.1) # Give the queue logger enough time to flush. - return True # Suppress the exception we logged: no traceback. + try: + if self.do_swap_out == self.PROMPT: + logger.info('Press ENTER to start swap out.') + input('') + if self.do_swap_out == self.PROMPT \ + or self.do_swap_out == self.AUTO: + self.experiment.swap_out() + if exc_val is not None: + logger.error('Something went wrong. Got %s: %s', + type(exc_val).__name__, str(exc_val)) + logger.debug('Exception details:', exc_info=exc_val) + finally: + log.flush_and_kill_logging() + # Make sure to print all logs before execution terminates, + # Specifically the last two error logs above. + return True + # Suppress the exception we logged: no traceback, unless requested. -- cgit v1.2.3