# # A library to manage ARCFIRE experiments # # Copyright (C) 2017 Nextworks S.r.l. # Copyright (C) 2017 imec # # Sander Vrijders # Dimitri Staessens # Vincenzo Maffione # Marco Capitani # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., http://www.fsf.org/about/contact/. # # Base class for client apps # # @ap: Application Process binary # @options: Options to pass to the binary # import functools import math import os import random import time import uuid import rumba.model as model import rumba.ssh_support as ssh_support import rumba.log as log try: from io import StringIO except ImportError: import StringIO # Python 2 here logger = log.get_logger(__name__) try: from numpy.random import poisson as _poisson from numpy.random import exponential as _exponential logger.debug("Using numpy for faster and better random variables.") except ImportError: from rumba.recpoisson import poisson as _poisson def _exponential(mean_duration): return random.expovariate(1.0 / mean_duration) logger.debug("Falling back to simple implementations.") # PROBLEM! These logs will almost never be printed... # But we might not care class SBEntity(object): def __init__(self, e_id): self.id = e_id def get_e_id(self): return type(self).__name__ + '.' + self.id class Client(SBEntity): current_id = -1 @classmethod def get_id(cls): cls.current_id += 1 return cls.current_id def __init__(self, ap, nodes=None, options=None, shutdown="kill ", c_id=None): self.ap = ap e_id = c_id if c_id is not None else self.ap.replace(' ', '_') super(Client, self).__init__(e_id) self.startup = (ap + ((" " + options) if options is not None else "")) if isinstance(nodes, model.Node): nodes = [nodes] elif nodes is None: nodes = [] self.nodes = nodes self.shutdown = shutdown def add_node(self, node): if not isinstance(node, model.Node): raise Exception("A Node is required.") self.nodes.append(node) def process(self, duration, node=None, proc_id=None): if proc_id is None: proc_id = "%s_%s" % (self.id, self.get_id()) if node is None: if len(self.nodes) == 0: raise ValueError('No nodes for client %s' % (self.id,)) node = random.choice(self.nodes) return ClientProcess( proc_id, self.ap, self.startup, duration, node, self.shutdown ) # Base class for client processes # # @ap: Application Process binary # @duration: The time (in seconds) this process should run # @start_time: The time at which this process is started. # @options: Options to pass to the binary # class ClientProcess(SBEntity): def __init__(self, proc_id, ap, startup, duration, node, shutdown="kill "): super(ClientProcess, self).__init__(proc_id) self.ap = ap self.startup = startup self.duration = duration if duration is not None else -1 self.start_time = None self.running = False self.node = node self.pid = None self.shutdown = shutdown def run(self): if self.node is None: raise Exception('No node specified for client %s' % (self.ap,)) self.start_time = time.time() logger.debug( 'Starting client app %s on node %s with duration %s.', self.ap, self.node.name, self.duration ) start_cmd = "./startup.sh %s %s" % ( self.id, self.startup.replace("", str(self.duration)), ) self.running = True try: self.pid = self.node.execute_command(start_cmd) except ssh_support.SSHException: logger.warning('Could not start client %s on node %s.', self.ap, self.node.name) logger.debug('Client app %s on node %s got pid %s.', self.ap, self.node.name, self.pid) def stop(self): if self.shutdown != "": logger.debug( 'Killing client %s on node %s.', self.ap, self.node.name ) try: kill_cmd = self.shutdown.replace('', str(self.pid)) self.node.execute_command(kill_cmd) except ssh_support.SSHException: logger.warn('Could not kill client %s on node %s.', self.ap, self.node.name) else: logger.debug( 'Client %s on node %s has terminated.', self.ap, self.node.name ) def check_and_kill(self): """Check if the process should keep running, stop it if not, and return true if and only if it is still running.""" now = time.time() if not self.running: return False if now - self.start_time >= self.duration: self.stop() self.running = False return False return True # Base class for server programs # # @ap: Application Process binary # @arrival_rate: Average requests/s to be received by this server # @mean_duration: Average duration of a client connection (in seconds) # @options: Options to pass to the binary # @max_clients: Maximum number of clients to serve # @clients: Client binaries that will use this server # @nodes: Specific nodes to start this server on # class Server(SBEntity): current_id = -1 @classmethod def get_id(cls): cls.current_id += 1 return cls.current_id def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), clients=None, nodes=None, min_duration=2, s_id=None): self.ap = ap e_id = s_id if s_id is not None else self.ap.replace(' ', '_') super(Server, self).__init__(e_id) self.options = options if options is not None else "" self.max_clients = max_clients if clients is None: clients = list() self.clients = clients if nodes is None: nodes = [] self.nodes = nodes self.arrival_rate = arrival_rate # mean requests/s self.actual_parameter = max(mean_duration - min_duration, 0.1) # in seconds self.pids = {} self.min_duration = min_duration def add_client(self, client): self.clients.append(client) def del_client(self, client): self.clients.remove(client) def add_node(self, node): self.nodes.append(node) def del_node(self, node): self.nodes.remove(node) def get_new_clients(self, interval): """ Returns a list of clients of size appropriate to the server's rate. The list's size should be a sample from Poisson(arrival_rate) over interval seconds. Hence, the average size should be interval * arrival_rate. """ number = _poisson(self.arrival_rate * interval) number = int(min(number, self.max_clients)) return [self._make_process_arguments() for _ in range(number)] def get_duration(self): return _exponential(self.actual_parameter) + self.min_duration def _make_process_arguments(self, duration=None, node=None, proc_id=None, client=None): if len(self.clients) == 0: raise Exception("Server %s has empty client list." % (self,)) if duration is None: duration = self.get_duration() if client is None: client = random.choice(self.clients) if node is None: node = random.choice(client.nodes) if proc_id is None: proc_id = "%s_%s" % (client.ap, client.get_id()) return duration, node, proc_id, client def make_client_process(self, duration=None, node=None, proc_id=None, client=None): """Returns a client of this server""" (d, n, p, c) = self._make_process_arguments(duration, node, proc_id, client) return c.process( duration=float("%.2f" % (d,)), node=n, proc_id=p ) def run(self): for node in self.nodes: run_cmd = self.ap + ( (" " + self.options) if self.options is not None else "" ) cmd_2 = "./startup.sh %s %s" % ('server_' + self.id, run_cmd) logger.debug( 'Starting server %s on node %s.', self.id, node.name ) try: self.pids[node] = node.execute_command(cmd_2) except ssh_support.SSHException: logger.warn('Could not start server %s on node %s.', self.id, node.name) def stop(self): for node, pid in self.pids.items(): logger.debug( 'Killing server %s on node %s.', self.id, node.name ) try: node.execute_command("kill %s" % pid) except ssh_support.SSHException: logger.warn('Could not kill server %s on node %s.', self.id, node.name) # Base class for ARCFIRE storyboards # # @experiment: Experiment to use as input # @duration: Duration of the whole storyboard # @servers: App servers available in the network. # Type == Server or Type == List[Tuple[Server, Node]] # class StoryBoard(SBEntity): SCRIPT_RESOLUTION = 0.1 EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float) def get_e_id(self): return 'storyboard' def __init__(self, duration, experiment=None, servers=None, script=None): self.id = 'storyboard' self.experiment = experiment self.duration = duration self.cur_time = 0.0 self.server_apps = {} self.client_apps = {} if servers is None: servers = list() for s in servers: self._validate_and_add_server(s) self.client_nodes = set() self.server_nodes = set() self.process_dict = {} self.active_clients = self.process_dict.values() self.start_time = None self.commands_list = {} self.node_map = {} self.shims = {} self._build_nodes_lists() # The following must be last, because it needs the info from # _build_nodes_list self._script = script if script is not None else _Script(self) def _build_nodes_lists(self): """Populates server_nodes and client_nodes lists""" for server in self.server_apps.values(): for node in server.nodes: self.server_nodes.add(node) for client in server.clients: for node in client.nodes: self.client_nodes.add(node) if self.experiment is not None: for node in self.experiment.nodes: self.node_map[node.name] = node for dif in self.experiment.dif_ordering: if isinstance(dif, model.ShimEthDIF): self.shims[dif.name] = dif def _validate_and_add_server(self, s, n=None): if len(s.clients) == 0: logger.warning("'%s' server has no registered clients.", s.ap) else: for client in s.clients: self.client_apps[client.id] = client if n is not None: s.add_node(n) else: if len(s.nodes) == 0: logger.warning("'%s' server has no registered nodes.", s.ap) for node in s.nodes: if node not in self.experiment.nodes: raise ValueError('Cannot run server on node %s, ' 'not in experiment.' % (node.name,)) self.server_apps[s.id] = s self._build_nodes_lists() def set_experiment(self, experiment): """ Set the storyboard's underlying experiment instance @param experiment: the experiment instance """ if not isinstance(experiment, model.Experiment): raise TypeError('Experiment instance required.') self.experiment = experiment self._build_nodes_lists() def add_server(self, server): """Register a server node to the sb.""" if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") if not isinstance(server, Server): raise TypeError('Argument must be of type Server') self._validate_and_add_server(server) def add_server_on_node(self, server, node): """ Utility method to simultaneously add a server to a sb and a node to the server. @param server: the server to be added to the storyboard @param node: the node upon which the server should run """ if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") if not isinstance(server, Server): raise TypeError('First argument must be of type Server') if not isinstance(node, model.Node): raise TypeError('Second argument must be of type Node') self._validate_and_add_server(server, node) def del_server(self, server): del self.server_apps[server.id] self._build_nodes_lists() def schedule_action(self, call, args=None, kwargs=None, c_time=None, trigger=None, ev_id=None,): """ Calls a function with the specified triggers and arguments. :param call: the function to run :type call: function (methods included) :param c_time: the function will not be called before `c_time` seconds have passed :type c_time: :py:class:`float` :param trigger: the function must not be called before the event `trigger` has completed :type trigger: :py:class:`.Event` or :py:class:`str` :param ev_id: the ID to assign to the generated event :type ev_id: :py:class:`str` :param args: arguments to pass to the function :param kwargs: keyword arguments to be passed :return: the event representing the calling of the function :rtype: :py:class:`.Event` """ if args is None: args = [] if kwargs is None: kwargs = {} action = functools.partial(call, *args, **kwargs) event = Event(action, ev_id=ev_id, ev_time=c_time, trigger=trigger) self.add_event(event) return event def schedule_command(self, t, node, command): """ Schedules the given command to be run at t seconds from the start. The commands are run in no particular order, so take care @param t: (float) seconds to wait before running the command @param node: (Node or str) the node on which the command should be run @param command: (str or list[str]) the command(s) to be run, """ if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") if self._script is None: self._script = _Script(self) if isinstance(node, str): node = self.node_map[node] if node not in self.experiment.nodes: raise ValueError('Cannot run command on node %s, ' 'not in experiment.' % (node.name,)) if isinstance(command, str): command = [command] action = functools.partial(self.run_command, node, command) self._script.add_event(Event(action, ev_time=t)) def run_command(self, node, command): """ Runs a command (or several) on a given node, immediately. @param node: (Node or str) the node on which the command should be run @param command: (str or list[str]) the command(s) to be run """ if self.experiment is None: raise ValueError("Experiment needed to run commands.") if isinstance(node, str): node = self.node_map[node] if node not in self.experiment.nodes: raise ValueError('Cannot run command on node %s, ' 'not in experiment.' % (node.name,)) if isinstance(command, str): command = [command] node.execute_commands(command) def add_event(self, event): """ Add an event to this script, provided either as an Event instance or as a string as read from a .rsb script. :param event: the event to add :type event: (Event or str) """ self._script.add_event(event) def del_event(self, event): """ Remove an event from this storyboard :param event: the event (or id thereof) to remove :type event: (Event or str) """ self._script.del_event(event) def run_client_of(self, server, duration=None, node=None, proc_id=None, callback=None): """ Runs a random client of the specified server with the specified parameters. Except for the server, if a parameter is not specified, it will be randomly generated according to the server parameters (mean duration, client apps and their nodes) @param server: the server of which one client should be run @param duration: the duration of the client process @param node: (Node or str) the node on which the client should be run @param proc_id: the entity ID to use for the process @param callback: callable or list thereof to be run after client termination """ if isinstance(server, str): server = self.server_apps[server] if duration is None: duration = server.get_duration() client = random.choice(server.clients) self.run_client(client, duration, node, proc_id, callback) def run_client(self, client, duration, node=None, proc_id=None, callback=None): """ Runs the specified client app with the specified parameters. If the node parameter is not given, it will be chosen at random among the client default nodes. @param client: the client which should be run @param duration: the duration of the client process @param node: (Node or str) the node on which the client should be run @param proc_id: the entity ID to use for the process @param callback: callable or list thereof to be run after client termination """ if isinstance(client, str): client = self.client_apps[client] if node is None: if len(client.nodes) == 0: raise ValueError('No nodes registered for client %s', client.id) node = random.choice(client.nodes) elif isinstance(node, str): node = self.node_map[node] if callback is None: callback = [] elif not hasattr(callback, '__len__'): callback = [callback] process = client.process(duration, node, proc_id) self.process_dict[process.id] = process process.run() action = functools.partial(self.kill_process, process.id) term_ev = Event(action, ev_time=(self.cur_time + duration)) self.add_event(term_ev) for cb in callback: cb_event = Event(action=cb, trigger=term_ev) self.add_event(cb_event) def start_client_of(self, server, duration=None, node=None, proc_id=None): """ Starts a random client of the specified server with the specified parameters. Except for the server and the duration, if a parameter is not specified, it will be randomly generated according to the server parameters (client apps and their nodes). Note that this method, as opposed to :meth:`rumba.storyboard.run_client_of`, will not generate an event to stop the client after the duration is expired. In most cases, :meth:`rumba.storyboard.run_client_of` is the way to go. @param server: the server of which one client should be run @param duration: the duration of the client process @param node: the node on which the client should be run @param proc_id: the entity ID to use for the process """ if isinstance(server, str): server = self.server_apps[server] client = random.choice(server.clients) self.start_client(client, duration, node, proc_id) def start_client(self, client, duration=None, node=None, proc_id=None): """ Starts the specified client app with the specified parameters. If the node parameter is not given, it will be chosen at random among the client default nodes. Note that this method, as opposed to :meth:`rumba.storyboard.run_client`, will not generate an event to stop the client after the duration is expired. In most cases, :meth:`rumba.storyboard.run_client` is the way to go. @param client: the client which should be run @param duration: the duration of the client process @param node: the node on which the client should be run @param proc_id: the entity ID to use for the process """ if isinstance(client, str): client = self.client_apps[client] if node is None: node = random.choice(client.nodes) process = client.process(duration, node, proc_id) self.process_dict[process.id] = process process.run() def kill_process(self, proc_id): process = self.process_dict.get(proc_id, None) if process is None: raise ValueError('No process named %s' % (proc_id,)) process.stop() del self.process_dict[proc_id] def periodic_check(self, t): self._script.check_for_ready_ev(t) self._script.run_ready() def generate_script(self, clean=True): """ Randomly generate a script for this experiment based on the parameters provided to the instances of servers, nodes and clients. @param clean: if True, discard the current script before generating a new one. """ if self.experiment is None: raise ValueError('Cannot generate script without an experiment') if clean: self._script = _Script(self) t = self.SCRIPT_RESOLUTION marker = 5 last_marker = 0 while t < self.duration: if int(t) >= (last_marker+1)*marker: last_marker += 1 logger.debug('Passed the %s seconds mark', last_marker*marker) for server in self.server_apps.values(): c_l = server.get_new_clients(self.SCRIPT_RESOLUTION) for d, n, p, c in c_l: if d > self.duration - t: # would outlast the experiment continue start = self._make_process_events( t, d, n, p, c ) self._script.add_event(start) t += self.SCRIPT_RESOLUTION def _make_process_events(self, t, d, n, p, c): start_action = functools.partial( self.run_client, c, d, n, p ) start_event = Event(start_action, ev_time=t) return start_event def start(self): if self.experiment is None: raise ValueError("Cannot run sb with no experiment.") if self._script is None: self.generate_script() logger.info('Starting storyboard execution') self._build_nodes_lists() logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes]) logger.debug('Command list is: %s.', {x: [(y.name, z) for y, z in t] for (x, t) in self.commands_list.items()}) self.start_time = time.time() script = r'logname="$1"; shift; nohup "${@}" ' \ r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"' logger.debug("Writing utility startup script on nodes.") for node in self.node_map.values(): node.execute_command( "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) try: for server in self.server_apps.values(): server.run() res = self.SCRIPT_RESOLUTION # for brevity while self.cur_time < self.duration: self.periodic_check(self.cur_time) next_breakpoint = math.ceil(self.cur_time / res) * res delta = next_breakpoint - self.cur_time if delta > 0: # just in case time.sleep(delta) self.cur_time = float(time.time() - self.start_time) self.periodic_check(self.cur_time) # Do things that were scheduled # in the last seconds # of the StoryBoard finally: # Kill everything. No more mercy. for client in self.active_clients: client.stop() for server in self.server_apps.values(): server.stop() def fetch_logs(self, local_dir=None): if local_dir is None: local_dir = self.experiment.log_dir if not os.path.isdir(local_dir): raise Exception('Destination "%s" is not a directory. ' 'Cannot fetch logs.' % local_dir) for node in self.node_map.values(): dst_dir = os.path.join(local_dir, node.name) if not os.path.isdir(dst_dir): os.mkdir(dst_dir) logs_list = node.execute_command('ls /tmp/*.rumba.log ' '|| echo ""') logs_list = [x for x in logs_list.split('\n') if x != ''] logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, dst_dir) def set_link_state(self, t, dif, state): if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") if not isinstance(dif, model.ShimEthDIF): raise Exception("Not a Shim Ethernet DIF.") if self._script is None: self._script = _Script(self) for node in dif.members: action = functools.partial(node.set_link_state, dif, state) self._script.add_event(Event(action, ev_time=t)) def set_link_up(self, t, dif): self.set_link_state(t, dif, 'up') def set_link_down(self, t, dif): self.set_link_state(t, dif, 'down') def set_node_state(self, t, node, state): if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") if self._script is None: self._script = _Script(self) for dif in node.difs: if not isinstance(dif, model.ShimEthDIF): continue action = functools.partial(node.set_link_state, dif, state) self._script.add_event(Event(action, ev_time=t)) def set_node_up(self, t, node): self.set_node_state(t, node, 'up') def set_node_down(self, t, node): self.set_node_state(t, node, 'down') def write_script(self, buffer): """ Writes the script on a string buffer, at the current position @param buffer: a string buffer. """ self._script.write(buffer) def write_script_to_file(self, filename, clean=True): """ Writes the script to a file, overwriting content. @param filename: the name of the destination file @param clean: if True, current file contents will be overwritten. """ mode = 'w' if not clean: mode += '+' with open(filename, mode) as f: self.write_script(f) def write_script_string(self): """ Writes the script into a string and returns it. @return: the script as a string. """ s = StringIO() self.write_script(s) return s def parse_script(self, buffer, clean=True): """ Reads a script from a buffer, at the current position. @param buffer: the buffer to read from. @param clean: if True, discard the current script before reading. """ if clean: self._script = _Script(self) self._script.parse(buffer) def parse_script_file(self, filename, clean=True): """ Reads a script from a file. @param filename: the file to read from. @param clean: if True, discard the current script before reading. """ if clean: self._script = _Script(self) with open(filename, 'r') as f: self.parse_script(f, clean) def parse_script_string(self, string, clean=True): """ Reads a script from a string. @param string: the string to read from. @param clean: if True, discard the current script before reading. """ if clean: self._script = _Script(self) buffer = StringIO(string) self.parse_script(buffer, clean) def capture_traffic(self, start, end, node, dif): """ Captures the traffic of an interface on a certain node. :param start: The time to start capturing. :param end: The time to stop capturing. :param node: The node to capture on. :param dif: The Shim Ethernet DIF of the node, Rumba automatically resolves the correct interface. """ for ipcp in dif.ipcps: if ipcp.node is not node: continue # In case tcpdump is not present, this assumes a testbed # with Ubuntu/Debian just like the rest of installation if not node.has_tcpdump: ssh_support.aptitude_install(self.experiment.testbed, node, ["tcpdump"]) node.has_tcpdump = True # Create random string pcap_file = node.name + '_' + dif.name + '_' + \ str(uuid.uuid4())[0:4] + ".pcap" tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" \ % (ipcp.ifname, pcap_file))\ .process(end-start, node, 'tcpdump_proc') self.schedule_action(tcpd_client.run, c_time=start) end_event = self.schedule_action(tcpd_client.stop, c_time=end) self.schedule_action( node.fetch_file, args=[pcap_file, self.experiment.log_dir], kwargs={'sudo': True}, trigger=end_event ) class Event(object): cur_id = -1 @classmethod def generate_id(cls): cls.cur_id += 1 return "__event_%s" % (cls.cur_id,) def __init__(self, action, ev_id=None, ev_time=None, trigger=None): """ @param ev_id: (str) id of the event @param action: (any SBEntity method) action to undertake when event is activated @param ev_time: (float) seconds to wait before running the event @param trigger: (Event) Event which must complete before this runs """ self.id = ev_id if ev_id is not None else self.generate_id() if ev_time is None and trigger is None: raise ValueError('No condition specified for event %s.' % self.id) self.action = action self.time = ev_time self._trigger = trigger self.trigger = trigger.id if trigger is not None else None self.exception = None self.done = False self._repr = "%(prefix)s &%(label)s | %(entity)s %(method)s" % { 'prefix': self._prefix_repr(), 'label': self.id, 'entity': '$' + self.action.func.__self__.get_e_id(), 'method': self._action_repr() } def _action_repr(self): if isinstance(self.action, functools.partial): name = self.action.func.__name__ args = ' '.join(self._action_arg_repr(x) for x in self.action.args) else: name = self.action.__name__ args = '' return ' '.join((name, args)) @staticmethod def _action_arg_repr(arg): if hasattr(arg, 'get_e_id'): return '$' + arg.get_e_id() elif isinstance(arg, str): return "'" + arg + "'" elif isinstance(arg, float): return "%.2f" % (arg,) else: return str(arg) # Assuming int def _prefix_repr(self): conditions = [] if self.time is not None: conditions.append("%.2f" % (self.time,)) if self.trigger is not None: conditions.append(self.trigger) return ", ".join(conditions) @property def failed(self): return self.exception is not None def pre_exec(self): # hook to be overridden pass def post_exec(self): # hook to be overridden pass def _start(self): self.pre_exec() if self.done: raise ValueError('Event %s has already ran' % self.id) def run(self): """ Run this event's action """ self._start() try: self.action() except Exception as e: self.exception = e self._done() def check(self, cur_time): """ Check if this event can be run. @param cur_time: (float) current time @return: True if the preconditions are satisfied, False otherwise. """ return \ (self.time is None or cur_time > self.time) \ and (self._trigger is None or self._trigger.done) def _done(self): self.done = True if self.exception is not None: logger.warning('Event %s failed. %s: %s.', self.id, type(self.exception).__name__, str(self.exception)) self.post_exec() def __str__(self): return self.id def __repr__(self): return self._repr class _Script(object): def __init__(self, storyboard): if storyboard is None: raise ValueError("storyboard must not be None") self.events_by_id = {} self._waiting_events = {} self._events_ready = [] self._nodes = {} self._servers = {} self._clients = {} self._storyboard = storyboard self._entities = {} self._parse_entities() @property def _experiment(self): return self._storyboard.experiment @property def _testbed(self): exp = self._experiment if exp is None: return None else: return exp.testbed def _parse_entities(self): self._nodes = self._storyboard.node_map self._servers = self._storyboard.server_apps self._clients = self._storyboard.client_apps self._processes = {} self._shims = self._storyboard.shims self._entities = { 'sb': self._storyboard, 'storyboard': self._storyboard, 'testbed': self._testbed, 'experiment': self._experiment, 'Node': self._nodes, 'Server': self._servers, 'Client': self._clients, 'ClientProcess': self._processes, 'ShimEthDIF': self._shims } def add_process(self, process): self._processes[process.id] = process def add_event(self, event): """ Add an event to this script @param event: (Event or str) the event to add """ if isinstance(event, str): self._parse_line(event) return ev_id = event.id self.events_by_id[ev_id] = event self._waiting_events[ev_id] = event def del_event(self, event): """ Remove an event from this script @param event: (Event or str) the event (or id thereof) to remove """ if isinstance(event, Event): event = event.id del self.events_by_id[event] del self._events_ready[event] def check_for_ready_ev(self, cur_time): """ Check which events are ready to run, and mark them @param (float) current time """ new_wait = {} for i, e in self._waiting_events.items(): if e.check(cur_time): self._events_ready.append(e) else: new_wait[i] = e self._waiting_events = new_wait def run_ready(self): """ Run events marked as ready by a previous inspection (see @check_for_ready_ev) """ while len(self._events_ready): event = self._events_ready.pop() logger.debug("Running event %s.", event.id) event.run() def _nested_iterator(self, d): for t in ((x, self.events_by_id[x]) for e_lst in d.values() for x in e_lst): yield t def _resolve_entity(self, e_id): e_key = e_id.split('.') result = self._entities while len(e_key) != 0: key = e_key.pop(0) result = result.get(key) if result is None: raise ValueError('Invalid entity key %s at %s' % (e_id, key)) return result def _parse_action(self, entity, action_l): method_n = action_l[0] method = getattr(entity, method_n, None) if method is None: raise ValueError('No method called "%s" for entity %s.' % (method_n, entity.get_e_id())) args_l = action_l[1:] args = self._parse_action_arguments(args_l) return functools.partial(method, *args) # TODO maybe some argument checking? def _parse_action_arguments(self, args_l): args = [] quotes_part = None while len(args_l) > 0: arg_s = args_l.pop(0) if quotes_part is not None: if arg_s.endswith("'"): arg_s = arg_s[:-1] # Strip final ' quotes_part += ' ' + arg_s args.append(quotes_part) quotes_part = None elif arg_s.startswith("'"): arg_s = arg_s[1:] # Strip initial ' if arg_s.endswith("'"): arg_s = arg_s[:-1] # Strip final ' args.append(arg_s) else: quotes_part = arg_s else: if arg_s.startswith('$'): args.append(self._resolve_entity(arg_s[1:])) else: try: args.append(float(arg_s)) except ValueError: raise ValueError('Syntax error: %s is not a float. ' 'If it is supposed to be a string, ' 'enclose it in single quotes.' % (arg_s,)) return args def _parse_prefix(self, prefix): prefix = prefix.strip().split('&') if len(prefix) > 2: raise ValueError('Syntax error: multiple "&" in prefix') conditions = prefix[0].strip().split(',') if len(prefix) == 2: label = prefix[1].strip() else: label = None if len(conditions) == 0: raise ValueError("Syntax error: expected at least one condition") elif len(conditions) > 2: raise ValueError("Syntax error: expected at most two condition") t, trigger = self._parse_conditions(*[x.strip() for x in conditions]) return label, t, trigger def _parse_suffix(self, suffix): action_l = suffix.strip().split(' ') if action_l[0].startswith('$'): entity_id = action_l[0][1:] action_l = action_l[1:] if len(action_l) == 0: raise ValueError('Syntax error: missing action.') else: entity_id = 'sb' entity = self._resolve_entity(entity_id) action = self._parse_action(entity, action_l) return action def _parse_line(self, line): parts = line.split('|') if len(parts) != 2: raise ValueError("Syntax error: expected exactly one '|'") label, t, trigger = self._parse_prefix(parts[0]) action = self._parse_suffix(parts[1]) event = Event(action, ev_id=label, ev_time=t, trigger=trigger) self.add_event(event) def parse(self, str_iterable): for index, line in enumerate(str_iterable): if line.startswith('#'): continue if line.strip('\n').strip(' ') == '': continue try: self._parse_line(line) except ValueError as e: raise ValueError(str(e) + ' -> @ line %s' % (index,)) def write(self, buffer): ev_list = list(self.events_by_id.values()) ev_list.sort(key=lambda x: x.time if x.time is not None else float('+inf')) for event in ev_list: buffer.write(repr(event) + '\n') def _parse_conditions(self, *conditions): """ Parses condition strings and returns the conditions @param conditions: list of strings @return: (Tuple[float, event]) -> (time, trigger) """ t, trigger = None, None for cond in conditions: if t is None: try: temp = float(cond) if temp > 24 * 3600: # seconds in a day raise ValueError t = temp continue except ValueError: pass if trigger is None: try: trigger = self.events_by_id[cond] continue except KeyError: pass raise ValueError('Syntax error: cannot parse condition {}. ' 'Either the condition is malformed, or there are ' 'multiple triggers of the same type.') return t, trigger