# # A library to manage ARCFIRE experiments # # Copyright (C) 2017 Nextworks S.r.l. # Copyright (C) 2017 imec # # Sander Vrijders # Dimitri Staessens # Vincenzo Maffione # Marco Capitani # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., http://www.fsf.org/about/contact/. # # Base class for client apps # # @ap: Application Process binary # @options: Options to pass to the binary # import os import random import time import rumba.model as model import rumba.ssh_support as ssh_support import rumba.log as log logger = log.get_logger(__name__) try: from numpy.random import poisson from numpy.random import exponential logger.debug("Using numpy for faster and better random variables.") except ImportError: from rumba.recpoisson import poisson def exponential(mean_duration): return random.expovariate(1.0 / mean_duration) logger.debug("Falling back to simple implementations.") # PROBLEM! These logs will almost never be printed... # But we might not care current_id = -1 def get_id(): global current_id current_id += 1 return current_id class Client(object): def __init__(self, ap, nodes=None, options=None, shutdown="kill "): self.ap = ap self.startup = (ap + ((" " + options) if options is not None else "")) if isinstance(nodes, model.Node): nodes = [nodes] elif nodes is None: nodes = [] self.nodes = nodes self.shutdown = shutdown def add_node(self, node): if not isinstance(node, model.Node): raise Exception("A Node is required.") self.nodes.append(node) def process(self, duration): node = random.choice(self.nodes) if len(self.nodes) > 0 else None return ClientProcess( get_id(), self.ap, self.startup, duration, node, self.shutdown ) # Base class for client processes # # @ap: Application Process binary # @duration: The time (in seconds) this process should run # @start_time: The time at which this process is started. # @options: Options to pass to the binary # class ClientProcess(object): def __init__(self, client_id, ap, startup, duration, node, shutdown=""): self.id = client_id self.ap = ap self.startup = startup self.duration = duration self.start_time = None self.running = False self.node = node self.pid = None self.shutdown = shutdown def run(self): if self.node is None: raise Exception('No node specified for client %s' % (self.ap,)) self.start_time = time.time() logger.debug( 'Starting client app %s on node %s with duration %s.', self.ap, self.node.name, self.duration ) start_cmd = "./startup.sh %s_%s %s" % ( self.ap, self.id, self.startup.replace("", str(self.duration)), ) self.running = True try: self.pid = self.node.execute_command(start_cmd) except ssh_support.SSHException: logger.warning('Could not start client %s on node %s.', self.ap, self.node.name) logger.debug('Client app %s on node %s got pid %s.', self.ap, self.node.name, self.pid) def stop(self): if self.shutdown != "": logger.debug( 'Killing client %s on node %s.', self.ap, self.node.name ) try: kill_cmd = self.shutdown.replace('', str(self.pid)) self.node.execute_command(kill_cmd) except ssh_support.SSHException: logger.warn('Could not kill client %s on node %s.', self.ap, self.node.name) else: logger.debug( 'Client %s on node %s has terminated.', self.ap, self.node.name ) def kill_check(self): """Check if the process should keep running, stop it if not, and return true if and only if it is still running.""" now = time.time() if not self.running: return False if now - self.start_time >= self.duration: self.stop() self.running = False return False return True # Base class for server programs # # @ap: Application Process binary # @arrival_rate: Average requests/s to be received by this server # @mean_duration: Average duration of a client connection (in seconds) # @options: Options to pass to the binary # @max_clients: Maximum number of clients to serve # @clients: Client binaries that will use this server # @nodes: Specific nodes to start this server on # class Server: def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), clients=None, nodes=None, min_duration=2): self.ap = ap self.options = options if options is not None else "" self.max_clients = max_clients if clients is None: clients = list() self.clients = clients if nodes is None: nodes = [] self.nodes = nodes self.arrival_rate = arrival_rate # mean requests/s self.actual_parameter = max(mean_duration - min_duration, 0.1) # in seconds self.pids = {} self.min_duration = min_duration def add_client(self, client): self.clients.append(client) def del_client(self, client): self.clients.remove(client) def add_node(self, node): self.nodes.append(node) def del_node(self, node): self.nodes.remove(node) def get_new_clients(self, interval): """ Returns a list of clients of size appropriate to the server's rate. The list's size should be a sample from Poisson(arrival_rate) over interval seconds. Hence, the average size should be interval * arrival_rate. """ number = poisson(self.arrival_rate * interval) number = int(min(number, self.max_clients)) return [self.make_client_process() for _ in range(number)] def make_client_process(self): """Returns a client of this server""" if len(self.clients) == 0: raise Exception("Server %s has empty client list." % (self,)) duration = exponential(self.actual_parameter) + self.min_duration return random.choice(self.clients).process( duration=float("%.2f" % (duration,)) ) def run(self): for node in self.nodes: logfile = "/tmp/%s_server.log" % self.ap script = r'nohup "$@" > %s 2>&1 & echo "$!"' % (logfile,) run_cmd = self.ap + ( (" " + self.options) if self.options is not None else "" ) cmd_1 = "echo '%s' > startup.sh && chmod a+x startup.sh" \ % (script,) cmd_2 = "./startup.sh %s" % (run_cmd,) logger.debug( 'Starting server %s on node %s with logfile %s.', self.ap, node.name, logfile ) try: node.execute_command(cmd_1) self.pids[node] = (node.execute_command(cmd_2)) except ssh_support.SSHException: logger.warn('Could not start server %s on node %s.', self.ap, node.name) def stop(self): for node, pid in self.pids.items(): logger.debug( 'Killing server %s on node %s.', self.ap, node.name ) try: node.execute_command("kill %s" % pid) except ssh_support.SSHException: logger.warn('Could not kill server %s on node %s.', self.ap, node.name) # Base class for ARCFIRE storyboards # # @experiment: Experiment to use as input # @duration: Duration of the whole storyboard # @servers: App servers available in the network. # Type == Server or Type == List[Tuple[Server, Node]] # class StoryBoard: DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) def __init__(self, duration, experiment=None, servers=None): self.experiment = experiment self.duration = duration self.servers = list() if servers is None: servers = list() for s in servers: self._validate_and_add_server(s) self.client_nodes = set() self.server_nodes = set() self.active_clients = [] self.start_time = None self.commands_list = {} def _build_nodes_lists(self): """Populates server_nodes and client_nodes lists""" for server in self.servers: for node in server.nodes: self.server_nodes.add(node) for client in server.clients: for node in client.nodes: self.client_nodes.add(node) def _validate_and_add_server(self, s, n=None): if len(s.clients) == 0: logger.warning("'%s' server has no registered clients.", s.ap) if n is not None: s.add_node(n) self.servers.append(s) else: if len(s.nodes) == 0: logger.warning("'%s' server has no registered nodes.", s.ap) self.servers.append(s) for node in s.nodes: if node not in self.experiment.nodes: raise ValueError('Cannot run server on node %s, ' 'not in experiment.' % (node.name,)) def set_experiment(self, experiment): if not isinstance(experiment, model.Experiment): raise TypeError('Experiment instance required.') self.experiment = experiment def add_server(self, server): """Register a server node to the sb.""" if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") if not isinstance(server, Server): raise TypeError('Argument must be of type Server') self._validate_and_add_server(server) def add_server_on_node(self, server, node): """ Utility method to simultaneously add a server to a sb and a node to the server. """ if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") if not isinstance(server, Server): raise TypeError('First argument must be of type Server') if not isinstance(node, model.Node): raise TypeError('Second argument must be of type Server') self._validate_and_add_server(server, node) def del_server(self, server): self.servers.remove(server) def run_command(self, t, node, command): """ Schedule the given command to be run at t seconds from the start. The commands are run in no particular order, so take care :param t: (float) seconds to wait before running the command :param node: (Node) the node on which the command should be run :param command: (str or list[str]) the command(s) to be run, """ if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") if node not in self.experiment.nodes: raise ValueError('Cannot run command on node %s, ' 'not in experiment.' % (node.name,)) if isinstance(command, str): self.commands_list.setdefault(t, []).append((node, command)) else: # Hope it's an Iterable[str]. Otherwise, errors will happen. for cmd in command: self.commands_list.setdefault(t, []).append((node, cmd)) def periodic_check(self): # Spawn new clients for server in self.servers: clients = server.get_new_clients(self.DEFAULT_INTERVAL) for new_client in clients: new_client.duration = min( new_client.duration, self.duration - (time.time() - self.start_time) ) # Make sure the duration of the client does not # go beyond the storyboard lifetime if new_client.duration < server.min_duration: continue # Do not start clients that would not run for # at least the minimum duration # (due to sb constraints) new_client.run() self.active_clients.append(new_client) # Kill expired clients surviving = [] for x in self.active_clients: if x.kill_check(): # surviving.append(x) self.active_clients = surviving # Do run_command instructions unexpired_commands = {} for t in self.commands_list: if (time.time() - self.start_time) > t: for node, command in self.commands_list[t]: node.execute_command(command) else: unexpired_commands[t] = self.commands_list[t] self.commands_list = unexpired_commands def start(self): if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") logger.info('Starting storyboard execution') self._build_nodes_lists() logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes]) logger.debug('Client nodes are: %s.', [x.name for x in self.client_nodes]) logger.debug('Command list is: %s.', {x: [(y.name, z) for y, z in t] for (x, t) in self.commands_list.items()}) self.start_time = time.time() script = r'logname="$1"; shift; nohup "${@}" ' \ r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"' logger.debug("Writing utility startup script on client nodes.") for node in self.client_nodes: node.execute_command( "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) try: for server in self.servers: server.run() while time.time() - self.start_time < self.duration: self.periodic_check() time.sleep(self.DEFAULT_INTERVAL) self.periodic_check() # Do things that were scheduled # in the last "INTERVAL" seconds # of the StoryBoard finally: # Kill everything. No more mercy. for client in self.active_clients: client.stop() for server in self.servers: server.stop() def fetch_logs(self, local_dir=None): if local_dir is None: local_dir = self.experiment.log_dir if not os.path.isdir(local_dir): raise Exception('Destination "%s" is not a directory. ' 'Cannot fetch logs.' % local_dir) for node in self.server_nodes: logs_list = node.execute_command('ls /tmp/*_server.log') logs_list = [x for x in logs_list.split('\n') if x != ''] logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, local_dir) for node in self.client_nodes: logs_list = node.execute_command('ls /tmp/*.rumba.log ' '|| echo ""') logs_list = [x for x in logs_list.split('\n') if x != ''] logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, local_dir)