# # A library to manage ARCFIRE experiments # # Copyright (C) 2017 Nextworks S.r.l. # Copyright (C) 2017 imec # # Sander Vrijders # Dimitri Staessens # Vincenzo Maffione # Marco Capitani # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., http://www.fsf.org/about/contact/. # # Base class for client apps # # @ap: Application Process binary # @options: Options to pass to the binary # import functools import math import os import random import time import rumba.model as model import rumba.ssh_support as ssh_support import rumba.log as log try: from io import StringIO except ImportError: import StringIO # Python 2 here logger = log.get_logger(__name__) try: from numpy.random import poisson from numpy.random import exponential logger.debug("Using numpy for faster and better random variables.") except ImportError: from rumba.recpoisson import poisson def exponential(mean_duration): return random.expovariate(1.0 / mean_duration) logger.debug("Falling back to simple implementations.") # PROBLEM! These logs will almost never be printed... # But we might not care class SBEntity(object): def __init__(self, e_id): self.id = e_id def get_e_id(self): return type(self).__name__ + '.' + self.id class Client(SBEntity): current_id = -1 @classmethod def get_id(cls): cls.current_id += 1 return cls.current_id def __init__(self, ap, nodes=None, options=None, shutdown="kill ", c_id=None): self.ap = ap e_id = c_id if c_id is not None else self.ap super(Client, self).__init__(e_id) self.startup = (ap + ((" " + options) if options is not None else "")) if isinstance(nodes, model.Node): nodes = [nodes] elif nodes is None: nodes = [] self.nodes = nodes self.shutdown = shutdown def add_node(self, node): if not isinstance(node, model.Node): raise Exception("A Node is required.") self.nodes.append(node) def process(self, duration, node=None, proc_id=None): if proc_id is None: proc_id = "%s_%s" % (self.ap, self.get_id()) if node is None: if len(self.nodes) == 0: raise ValueError('No nodes for client %s' % (self.id,)) node = random.choice(self.nodes) return ClientProcess( proc_id, self.ap, self.startup, duration, node, self.shutdown ) # Base class for client processes # # @ap: Application Process binary # @duration: The time (in seconds) this process should run # @start_time: The time at which this process is started. # @options: Options to pass to the binary # class ClientProcess(SBEntity): def __init__(self, proc_id, ap, startup, duration, node, shutdown="kill "): super(ClientProcess, self).__init__(proc_id) self.ap = ap self.startup = startup self.duration = duration if duration is not None else -1 self.start_time = None self.running = False self.node = node self.pid = None self.shutdown = shutdown def run(self): if self.node is None: raise Exception('No node specified for client %s' % (self.ap,)) self.start_time = time.time() logger.debug( 'Starting client app %s on node %s with duration %s.', self.ap, self.node.name, self.duration ) start_cmd = "./startup.sh %s %s" % ( self.id, self.startup.replace("", str(self.duration)), ) self.running = True try: self.pid = self.node.execute_command(start_cmd) except ssh_support.SSHException: logger.warning('Could not start client %s on node %s.', self.ap, self.node.name) logger.debug('Client app %s on node %s got pid %s.', self.ap, self.node.name, self.pid) def stop(self): if self.shutdown != "": logger.debug( 'Killing client %s on node %s.', self.ap, self.node.name ) try: kill_cmd = self.shutdown.replace('', str(self.pid)) self.node.execute_command(kill_cmd) except ssh_support.SSHException: logger.warn('Could not kill client %s on node %s.', self.ap, self.node.name) else: logger.debug( 'Client %s on node %s has terminated.', self.ap, self.node.name ) def check_and_kill(self): """Check if the process should keep running, stop it if not, and return true if and only if it is still running.""" now = time.time() if not self.running: return False if now - self.start_time >= self.duration: self.stop() self.running = False return False return True # Base class for server programs # # @ap: Application Process binary # @arrival_rate: Average requests/s to be received by this server # @mean_duration: Average duration of a client connection (in seconds) # @options: Options to pass to the binary # @max_clients: Maximum number of clients to serve # @clients: Client binaries that will use this server # @nodes: Specific nodes to start this server on # class Server(SBEntity): def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), clients=None, nodes=None, min_duration=2, s_id=None): self.ap = ap e_id = s_id if s_id is not None else self.ap super(Server, self).__init__(e_id) self.options = options if options is not None else "" self.max_clients = max_clients if clients is None: clients = list() self.clients = clients if nodes is None: nodes = [] self.nodes = nodes self.arrival_rate = arrival_rate # mean requests/s self.actual_parameter = max(mean_duration - min_duration, 0.1) # in seconds self.pids = {} self.min_duration = min_duration def add_client(self, client): self.clients.append(client) def del_client(self, client): self.clients.remove(client) def add_node(self, node): self.nodes.append(node) def del_node(self, node): self.nodes.remove(node) def get_new_clients(self, interval): """ Returns a list of clients of size appropriate to the server's rate. The list's size should be a sample from Poisson(arrival_rate) over interval seconds. Hence, the average size should be interval * arrival_rate. """ number = poisson(self.arrival_rate * interval) number = int(min(number, self.max_clients)) return [self._make_process_arguments() for _ in range(number)] def get_duration(self): return exponential(self.actual_parameter) + self.min_duration def _make_process_arguments(self, duration=None, node=None, proc_id=None, client=None): if len(self.clients) == 0: raise Exception("Server %s has empty client list." % (self,)) if duration is None: duration = self.get_duration() if client is None: client = random.choice(self.clients) if node is None: node = random.choice(client.nodes) if proc_id is None: proc_id = "%s_%s" % (client.ap, client.get_id()) return duration, node, proc_id, client def make_client_process(self, duration=None, node=None, proc_id=None, client=None): """Returns a client of this server""" (d, n, p, c) = self._make_process_arguments(duration, node, proc_id, client) return c.process( duration=float("%.2f" % (d,)), node=n, proc_id=p ) def run(self): for node in self.nodes: logfile = "/tmp/%s_server.log" % self.ap script = r'nohup "$@" > %s 2>&1 & echo "$!"' % (logfile,) run_cmd = self.ap + ( (" " + self.options) if self.options is not None else "" ) cmd_1 = "echo '%s' > startup.sh && chmod a+x startup.sh" \ % (script,) cmd_2 = "./startup.sh %s" % (run_cmd,) logger.debug( 'Starting server %s on node %s with logfile %s.', self.id, node.name, logfile ) try: node.execute_command(cmd_1) self.pids[node] = (node.execute_command(cmd_2)) except ssh_support.SSHException: logger.warn('Could not start server %s on node %s.', self.id, node.name) def stop(self): for node, pid in self.pids.items(): logger.debug( 'Killing server %s on node %s.', self.id, node.name ) try: node.execute_command("kill %s" % pid) except ssh_support.SSHException: logger.warn('Could not kill server %s on node %s.', self.id, node.name) # Base class for ARCFIRE storyboards # # @experiment: Experiment to use as input # @duration: Duration of the whole storyboard # @servers: App servers available in the network. # Type == Server or Type == List[Tuple[Server, Node]] # class StoryBoard(SBEntity): SCRIPT_RESOLUTION = 0.1 EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float) def get_e_id(self): return 'storyboard' def __init__(self, duration, experiment=None, servers=None, script=None): self.id = 'storyboard' self.experiment = experiment self.duration = duration self.cur_time = 0.0 self.server_apps = {} self.client_apps = {} if servers is None: servers = list() for s in servers: self._validate_and_add_server(s) self.client_nodes = set() self.server_nodes = set() self.process_dict = {} self.active_clients = self.process_dict.values() self.start_time = None self.commands_list = {} self.script = script self.node_map = {} self._build_nodes_lists() def _build_nodes_lists(self): """Populates server_nodes and client_nodes lists""" for server in self.server_apps.values(): for node in server.nodes: self.server_nodes.add(node) for client in server.clients: for node in client.nodes: self.client_nodes.add(node) if self.experiment is not None: for node in self.experiment.nodes: self.node_map[node.name] = node def _validate_and_add_server(self, s, n=None): if len(s.clients) == 0: logger.warning("'%s' server has no registered clients.", s.ap) else: for client in s.clients: self.client_apps[client.id] = client if n is not None: s.add_node(n) else: if len(s.nodes) == 0: logger.warning("'%s' server has no registered nodes.", s.ap) for node in s.nodes: if node not in self.experiment.nodes: raise ValueError('Cannot run server on node %s, ' 'not in experiment.' % (node.name,)) self.server_apps[s.id] = s self._build_nodes_lists() def set_experiment(self, experiment): if not isinstance(experiment, model.Experiment): raise TypeError('Experiment instance required.') self.experiment = experiment self._build_nodes_lists() def add_server(self, server): """Register a server node to the sb.""" if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") if not isinstance(server, Server): raise TypeError('Argument must be of type Server') self._validate_and_add_server(server) def add_server_on_node(self, server, node): """ Utility method to simultaneously add a server to a sb and a node to the server. """ if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") if not isinstance(server, Server): raise TypeError('First argument must be of type Server') if not isinstance(node, model.Node): raise TypeError('Second argument must be of type Node') self._validate_and_add_server(server, node) def del_server(self, server): del self.server_apps[server.id] self._build_nodes_lists() def schedule_command(self, t, node, command): """ Schedules the given command to be run at t seconds from the start. The commands are run in no particular order, so take care @param t: (float) seconds to wait before running the command @param node: (Node or str) the node on which the command should be run @param command: (str or list[str]) the command(s) to be run, """ if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") if self.script is None: self.script = Script(self) if isinstance(node, str): node = self.node_map[node] if node not in self.experiment.nodes: raise ValueError('Cannot run command on node %s, ' 'not in experiment.' % (node.name,)) if isinstance(command, str): command = [command] action = functools.partial(self.run_command, node, command) self.script.add_event(Event(action, ev_time=t)) def run_command(self, node, command): """ Runs a command (or several) on a given node, immediately. @param node: (Node or str) the node on which the command should be run @param command: (str or list[str]) the command(s) to be run """ if self.experiment is None: raise ValueError("Experiment needed to run commands.") if isinstance(node, str): node = self.node_map[node] if node not in self.experiment.nodes: raise ValueError('Cannot run command on node %s, ' 'not in experiment.' % (node.name,)) if isinstance(command, str): command = [command] node.execute_commands(command) def run_client_of(self, server, duration=None, node=None, proc_id=None): """ Runs a random client of the specified server with the specified parameters. Except for the server, if a parameter is not specified, it will be randomly generated according to the server parameters (mean duration, client apps and their nodes) @param server: the server of which one client should be run @param duration: the duration of the client process @param node: (Node or str) the node on which the client should be run @param proc_id: the entity ID to use for the process """ if isinstance(server, str): server = self.server_apps[server] if duration is None: duration = server.get_duration() client = random.choice(server.clients) self.run_client(client, duration, node, proc_id) def run_client(self, client, duration, node=None, proc_id=None): """ Runs the specified client app with the specified parameters. If the node parameter is not given, it will be chosen at random among the client default nodes. @param client: the client which should be run @param duration: the duration of the client process @param node: (Node or str) the node on which the client should be run @param proc_id: the entity ID to use for the process """ if isinstance(client, str): client = self.client_apps[client] if node is None: if len(client.nodes) == 0: raise ValueError('No nodes registered for client %s', client.id) node = random.choice(client.nodes) elif isinstance(node, str): node = self.node_map[node] process = client.process(duration, node, proc_id) self.process_dict[process.id] = process process.run() action = functools.partial(self.kill_process, process.id) self.script.add_event(Event(action, ev_time=(self.cur_time + duration))) def start_client_of(self, server, duration=None, node=None, proc_id=None): """ Starts a random client of the specified server with the specified parameters. Except for the server and the duration, if a parameter is not specified, it will be randomly generated according to the server parameters (client apps and their nodes). If the client app must be shutdown manually or the duration parameter is None, the client process will _not_ be stopped automatically, and will continue running unless otherwise killed. @param server: the server of which one client should be run @param duration: the duration of the client process @param node: the node on which the client should be run @param proc_id: the entity ID to use for the process """ if isinstance(server, str): server = self.server_apps[server] client = random.choice(server.clients) self.start_client(client, duration, node, proc_id) def start_client(self, client, duration=None, node=None, proc_id=None): """ Starts the specified client app with the specified parameters. If the node parameter is not given, it will be chosen at random among the client default nodes. If the app must be shutdown manually or the duration parameter is None, it will _not_ be stopped automatically, and will continue running unless otherwise killed. @param client: the client which should be run @param duration: the duration of the client process @param node: the node on which the client should be run @param proc_id: the entity ID to use for the process """ if isinstance(client, str): client = self.client_apps[client] if node is None: node = random.choice(client.nodes) process = client.process(duration, node, proc_id) self.process_dict[process.id] = process process.run() def kill_process(self, proc_id): process = self.process_dict.get(proc_id, None) if process is None: raise ValueError('No process named %s' % (proc_id,)) process.stop() del self.process_dict[proc_id] def periodic_check(self, t): self.script.check_for_ready_ev(t) self.script.run_ready() def parse_script(self, buffer): self.script = Script(self) self.script.parse(buffer) def parse_script_file(self, filename): self.script = Script(self) self.script.parse_file(filename) def generate_script(self): if self.experiment is None: raise ValueError('Cannot generate script without an experiment') self.script = Script(self) t = self.SCRIPT_RESOLUTION marker = 5 last_marker = 0 while t < self.duration: if int(t) >= (last_marker+1)*marker: last_marker += 1 logger.debug('Passed the %s seconds mark', last_marker*marker) for server in self.server_apps.values(): c_l = server.get_new_clients(self.SCRIPT_RESOLUTION) for d, n, p, c in c_l: if d > self.duration - t: # would outlast the experiment continue start = self._make_process_events( t, d, n, p, c ) self.script.add_event(start) t += self.SCRIPT_RESOLUTION def _make_process_events(self, t, d, n, p, c): start_action = functools.partial( self.run_client, c, d, n, p ) start_event = Event(start_action, ev_time=t) return start_event def start(self): if self.experiment is None: raise ValueError("Cannot run sb with no experiment.") if self.script is None: self.generate_script() logger.info('Starting storyboard execution') self._build_nodes_lists() logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes]) logger.debug('Client nodes are: %s.', [x.name for x in self.client_nodes]) logger.debug('Command list is: %s.', {x: [(y.name, z) for y, z in t] for (x, t) in self.commands_list.items()}) self.start_time = time.time() script = r'logname="$1"; shift; nohup "${@}" ' \ r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"' logger.debug("Writing utility startup script on client nodes.") for node in self.client_nodes: node.execute_command( "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) try: for server in self.server_apps.values(): server.run() res = self.SCRIPT_RESOLUTION # for brevity while self.cur_time < self.duration: self.periodic_check(self.cur_time) next_breakpoint = math.ceil(self.cur_time / res) * res delta = next_breakpoint - self.cur_time if delta > 0: # just in case time.sleep(delta) self.cur_time = float(time.time() - self.start_time) self.periodic_check(self.cur_time) # Do things that were scheduled # in the last seconds # of the StoryBoard finally: # Kill everything. No more mercy. for client in self.active_clients: client.stop() for server in self.server_apps.values(): server.stop() def fetch_logs(self, local_dir=None): if local_dir is None: local_dir = self.experiment.log_dir if not os.path.isdir(local_dir): raise Exception('Destination "%s" is not a directory. ' 'Cannot fetch logs.' % local_dir) for node in self.server_nodes: dst_dir = os.path.join(local_dir, node.name) if not os.path.isdir(dst_dir): os.mkdir(dst_dir) logs_list = node.execute_command('ls /tmp/*_server.log') logs_list = [x for x in logs_list.split('\n') if x != ''] logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, dst_dir) for node in self.client_nodes: dst_dir = os.path.join(local_dir, node.name) if not os.path.isdir(dst_dir): os.mkdir(dst_dir) logs_list = node.execute_command('ls /tmp/*.rumba.log ' '|| echo ""') logs_list = [x for x in logs_list.split('\n') if x != ''] logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, dst_dir) def set_link_state(self, t, dif, state): if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") if not isinstance(dif, model.ShimEthDIF): raise Exception("Not a Shim Ethernet DIF.") if self.script is None: self.script = Script(self) for ipcp in dif.ipcps: action = functools.partial(ipcp.node.set_link_state, ipcp, state) self.script.add_event(Event(action, ev_time=t)) def set_link_up(self, t, dif): self.set_link_state(t, dif, 'up') def set_link_down(self, t, dif): self.set_link_state(t, dif, 'down') def set_node_state(self, t, node, state): if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") if self.script is None: self.script = Script(self) for ipcp in node.ipcps: if not isinstance(ipcp, model.ShimEthIPCP): continue action = functools.partial(ipcp.node.set_link_state, ipcp, state) self.script.add_event(Event(action, ev_time=t)) def set_node_up(self, t, node): self.set_node_state(t, node, 'up') def set_node_down(self, t, node): self.set_node_state(t, node, 'down') class Event(object): cur_id = -1 @classmethod def generate_id(cls): cls.cur_id += 1 return "__event_%s" % (cls.cur_id,) def __init__(self, action, ev_id=None, ev_time=None, trigger=None): """ @param ev_id: (str) id of the event @param action: (any SBEntity method) action to undertake when event is activated @param ev_time: (float) seconds to wait before running the event @param trigger: (Event) Event which must complete before this runs """ if ev_time is None and trigger is None: raise ValueError('No condition specified for event %s.' % ev_id) self.id = ev_id if ev_id is not None else self.generate_id() self.action = action self.time = ev_time self._trigger = trigger self.trigger = trigger.id if trigger is not None else None self.exception = None self.done = False self._repr = "%(prefix)s &%(label)s | %(entity)s %(method)s" % { 'prefix': self._prefix_repr(), 'label': self.id, 'entity': '$' + self.action.func.__self__.get_e_id(), 'method': self._action_repr() } def _action_repr(self): if isinstance(self.action, functools.partial): name = self.action.func.__name__ args = ' '.join(self._action_arg_repr(x) for x in self.action.args) else: name = self.action.__name__ args = '' return ' '.join((name, args)) @staticmethod def _action_arg_repr(arg): if hasattr(arg, 'get_e_id'): return '$' + arg.get_e_id() elif isinstance(arg, str): return "'" + arg + "'" elif isinstance(arg, float): return "%.2f" % (arg,) else: return str(arg) # Assuming int def _prefix_repr(self): conditions = [] if self.time is not None: conditions.append("%.2f" % (self.time,)) if self.trigger is not None: conditions.append(self.trigger) return ", ".join(conditions) @property def failed(self): return self.exception is not None def pre_exec(self): # hook to be overridden pass def post_exec(self): # hook to be overridden pass def _start(self): self.pre_exec() if self.done: raise ValueError('Event %s has already ran' % self.id) def run(self): """ Run this event's action """ self._start() try: self.action() except Exception as e: self.exception = e self._done() def check(self, cur_time): """ Check if this event can be run. @param cur_time: (float) current time @return: True if the preconditions are satisfied, False otherwise. """ return \ (self.time is None or cur_time > self.time) \ and (self._trigger is None or self._trigger.done) def _done(self): self.done = True if self.exception is not None: logger.warning('Event %s failed. %s: %s.', self.id, type(self.exception).__name__, str(self.exception)) self.post_exec() def __str__(self): return self.id def __repr__(self): return self._repr class Script(object): def __init__(self, storyboard): # Brute force 'dump all in memory' approach to avoid # iterating through the events a lot of times # at runtime self.events_by_id = {} self._waiting_events = {} self._events_ready = [] self._nodes = {} self._servers = {} self._clients = {} self._testbed = None self._experiment = None self._storyboard = None self._entities = {} self._parse_entities(storyboard) def _parse_entities(self, storyboard): self._storyboard = storyboard self._experiment = self._storyboard.experiment self._nodes = self._storyboard.node_map self._testbed = self._experiment.testbed self._servers = self._storyboard.server_apps self._clients = self._storyboard.client_apps self._processes = {} self._entities = { 'sb': self._storyboard, 'storyboard': self._storyboard, 'testbed': self._testbed, 'experiment': self._experiment, 'Node': self._nodes, 'Server': self._servers, 'Client': self._clients, 'ClientProcess': self._processes } def add_process(self, process): self._processes[process.id] = process def add_event(self, event): """ Add an event to this script @param event: (Event or str) the event to add """ if isinstance(event, str): self._parse_line(event) return ev_id = event.id self.events_by_id[ev_id] = event self._waiting_events[ev_id] = event def del_event(self, event): """ Remove an event from this script @param event: (Event or str) the event (or id thereof) to remove """ if isinstance(event, Event): event = event.id del self.events_by_id[event] del self._events_ready[event] def check_for_ready_ev(self, cur_time): """ Check which events are ready to run, and mark them @param (float) current time """ new_wait = {} for i, e in self._waiting_events.items(): if e.check(cur_time): self._events_ready.append(e) else: new_wait[i] = e self._waiting_events = new_wait def run_ready(self): """ Run events marked as ready by a previous inspection (see @check_for_ready_ev) """ while len(self._events_ready): event = self._events_ready.pop() logger.debug("Running event %s.", event.id) event.run() def _nested_iterator(self, d): for t in ((x, self.events_by_id[x]) for e_lst in d.values() for x in e_lst): yield t def _resolve_entity(self, e_id): e_key = e_id.split('.') result = self._entities while len(e_key) != 0: key = e_key.pop(0) result = result.get(key) if result is None: raise ValueError('Invalid entity key %s at %s' % (e_id, key)) return result def _parse_action(self, entity, action_l): method_n = action_l[0] method = getattr(entity, method_n, None) if method is None: raise ValueError('No method called "%s" for entity %s.' % (method_n, entity.get_e_id())) args_l = action_l[1:] args = self._parse_action_arguments(args_l) return functools.partial(method, *args) # TODO maybe some argument checking? def _parse_action_arguments(self, args_l): args = [] quotes_part = None while len(args_l) > 0: arg_s = args_l.pop(0) if quotes_part is not None: if arg_s.endswith("'"): arg_s = arg_s[:-1] # Strip final ' quotes_part += ' ' + arg_s args.append(quotes_part) quotes_part = None elif arg_s.startswith("'"): arg_s = arg_s[1:] # Strip initial ' if arg_s.endswith("'"): arg_s = arg_s[:-1] # Strip final ' args.append(arg_s) else: quotes_part = arg_s else: if arg_s.startswith('$'): args.append(self._resolve_entity(arg_s[1:])) else: try: args.append(float(arg_s)) except ValueError: raise ValueError('Syntax error: %s is not a float. ' 'If it is supposed to be a string, ' 'enclose it in single quotes.' % (arg_s,)) return args def _parse_prefix(self, prefix): prefix = prefix.strip().split('&') if len(prefix) > 2: raise ValueError('Syntax error: multiple "&" in prefix') conditions = prefix[0].strip().split(',') if len(prefix) == 2: label = prefix[1].strip() else: label = None if len(conditions) == 0: raise ValueError("Syntax error: expected at least one condition") elif len(conditions) > 2: raise ValueError("Syntax error: expected at most two condition") t, trigger = self._parse_conditions(*[x.strip() for x in conditions]) return label, t, trigger def _parse_suffix(self, suffix): action_l = suffix.strip().split(' ') if action_l[0].startswith('$'): entity_id = action_l[0][1:] action_l = action_l[1:] if len(action_l) == 0: raise ValueError('Syntax error: missing action.') else: entity_id = 'sb' entity = self._resolve_entity(entity_id) action = self._parse_action(entity, action_l) return action def _parse_line(self, line): parts = line.split('|') if len(parts) != 2: raise ValueError("Syntax error: expected exactly one '|'") label, t, trigger = self._parse_prefix(parts[0]) action = self._parse_suffix(parts[1]) event = Event(action, ev_id=label, ev_time=t, trigger=trigger) self.add_event(event) def parse(self, str_iterable): for index, line in enumerate(str_iterable): if line.startswith('#'): continue if line.strip('\n').strip(' ') == '': continue try: self._parse_line(line) except ValueError as e: raise ValueError(str(e) + ' -> @ line %s' % (index,)) def parse_file(self, filename): with open(filename, 'r') as f: self.parse(f) def parse_string(self, s): buffer = StringIO(s) self.parse(buffer) def write(self, buffer): ev_list = list(self.events_by_id.values()) ev_list.sort(key=lambda x: x.time if x.time is not None else float('+inf')) for event in ev_list: buffer.write(repr(event) + '\n') def write_to_file(self, filename): with open(filename, 'w') as f: self.write(f) def write_string(self): s = StringIO() self.write(s) return s def _parse_conditions(self, *conditions): """ Parses condition strings and returns the conditions @param conditions: list of strings @return: (Tuple[float, event]) -> (time, trigger) """ t, trigger = None, None for cond in conditions: if t is None: try: temp = float(cond) if temp > 24 * 3600: # seconds in a day raise ValueError t = temp continue except ValueError: pass if trigger is None: try: trigger = self.events_by_id[cond] continue except KeyError: pass raise ValueError('Syntax error: cannot parse condition {}. ' 'Either the condition is malformed, or there are ' 'multiple triggers of the same type.') return t, trigger