From e174aaf3650c23331c757921b1af9b152f53c6e5 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Thu, 22 Feb 2018 10:18:10 +0100 Subject: storyboard: add replayability implements #27 --- rumba/storyboard.py | 759 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 661 insertions(+), 98 deletions(-) (limited to 'rumba/storyboard.py') diff --git a/rumba/storyboard.py b/rumba/storyboard.py index ca0cfb5..fa6413d 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -29,6 +29,8 @@ # @ap: Application Process binary # @options: Options to pass to the binary # +import functools +import math import os import random import time @@ -37,6 +39,11 @@ import rumba.model as model import rumba.ssh_support as ssh_support import rumba.log as log +try: + from io import StringIO +except ImportError: + import StringIO # Python 2 here + logger = log.get_logger(__name__) try: @@ -53,18 +60,29 @@ except ImportError: # PROBLEM! These logs will almost never be printed... # But we might not care -current_id = -1 + +class SBEntity(object): + + def __init__(self, e_id): + self.id = e_id + + def get_e_id(self): + return type(self).__name__ + '.' + self.id -def get_id(): - global current_id - current_id += 1 - return current_id +class Client(SBEntity): + current_id = -1 -class Client(object): - def __init__(self, ap, nodes=None, options=None, shutdown="kill "): + @classmethod + def get_id(cls): + cls.current_id += 1 + return cls.current_id + + def __init__(self, ap, nodes=None, options=None, shutdown="kill ", c_id=None): self.ap = ap + e_id = c_id if c_id is not None else self.ap + super(Client, self).__init__(e_id) self.startup = (ap + ((" " + options) if options is not None else "")) if isinstance(nodes, model.Node): nodes = [nodes] @@ -78,10 +96,16 @@ class Client(object): raise Exception("A Node is required.") self.nodes.append(node) - def process(self, duration): - node = random.choice(self.nodes) if len(self.nodes) > 0 else None + def process(self, duration, node=None, proc_id=None): + if proc_id is None: + proc_id = "%s_%s" % (self.ap, self.get_id()) + if node is None: + if len(self.nodes) == 0: + raise ValueError('No nodes for client %s' + % (self.id,)) + node = random.choice(self.nodes) return ClientProcess( - get_id(), + proc_id, self.ap, self.startup, duration, @@ -97,13 +121,13 @@ class Client(object): # @start_time: The time at which this process is started. # @options: Options to pass to the binary # -class ClientProcess(object): - def __init__(self, client_id, ap, startup, duration, - node, shutdown=""): - self.id = client_id +class ClientProcess(SBEntity): + def __init__(self, proc_id, ap, startup, duration, + node, shutdown="kill "): + super(ClientProcess, self).__init__(proc_id) self.ap = ap self.startup = startup - self.duration = duration + self.duration = duration if duration is not None else -1 self.start_time = None self.running = False self.node = node @@ -120,8 +144,7 @@ class ClientProcess(object): self.ap, self.node.name, self.duration ) - start_cmd = "./startup.sh %s_%s %s" % ( - self.ap, + start_cmd = "./startup.sh %s %s" % ( self.id, self.startup.replace("", str(self.duration)), ) @@ -152,7 +175,7 @@ class ClientProcess(object): self.ap, self.node.name ) - def kill_check(self): + def check_and_kill(self): """Check if the process should keep running, stop it if not, and return true if and only if it is still running.""" now = time.time() @@ -175,11 +198,13 @@ class ClientProcess(object): # @clients: Client binaries that will use this server # @nodes: Specific nodes to start this server on # -class Server: +class Server(SBEntity): def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), - clients=None, nodes=None, min_duration=2): + clients=None, nodes=None, min_duration=2, s_id=None): self.ap = ap + e_id = s_id if s_id is not None else self.ap + super(Server, self).__init__(e_id) self.options = options if options is not None else "" self.max_clients = max_clients if clients is None: @@ -216,15 +241,31 @@ class Server: """ number = poisson(self.arrival_rate * interval) number = int(min(number, self.max_clients)) - return [self.make_client_process() for _ in range(number)] + return [self._make_process_arguments() for _ in range(number)] - def make_client_process(self): - """Returns a client of this server""" + def get_duration(self): + return exponential(self.actual_parameter) + self.min_duration + + def _make_process_arguments(self, duration=None, node=None, proc_id=None, client=None): if len(self.clients) == 0: raise Exception("Server %s has empty client list." % (self,)) - duration = exponential(self.actual_parameter) + self.min_duration - return random.choice(self.clients).process( - duration=float("%.2f" % (duration,)) + if duration is None: + duration = self.get_duration() + if client is None: + client = random.choice(self.clients) + if node is None: + node = random.choice(client.nodes) + if proc_id is None: + proc_id = "%s_%s" % (client.ap, client.get_id()) + return duration, node, proc_id, client + + def make_client_process(self, duration=None, node=None, proc_id=None, client=None): + """Returns a client of this server""" + (d, n, p, c) = self._make_process_arguments(duration, node, proc_id, client) + return c.process( + duration=float("%.2f" % (d,)), + node=n, + proc_id=p ) def run(self): @@ -239,26 +280,26 @@ class Server: cmd_2 = "./startup.sh %s" % (run_cmd,) logger.debug( 'Starting server %s on node %s with logfile %s.', - self.ap, node.name, logfile + self.id, node.name, logfile ) try: node.execute_command(cmd_1) self.pids[node] = (node.execute_command(cmd_2)) except ssh_support.SSHException: logger.warn('Could not start server %s on node %s.', - self.ap, node.name) + self.id, node.name) def stop(self): for node, pid in self.pids.items(): logger.debug( 'Killing server %s on node %s.', - self.ap, node.name + self.id, node.name ) try: node.execute_command("kill %s" % pid) except ssh_support.SSHException: logger.warn('Could not kill server %s on node %s.', - self.ap, node.name) + self.id, node.name) # Base class for ARCFIRE storyboards @@ -268,52 +309,70 @@ class Server: # @servers: App servers available in the network. # Type == Server or Type == List[Tuple[Server, Node]] # -class StoryBoard: +class StoryBoard(SBEntity): - DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) + SCRIPT_RESOLUTION = 0.1 + EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float) - def __init__(self, duration, experiment=None, servers=None): + def get_e_id(self): + return 'storyboard' + + def __init__(self, duration, experiment=None, servers=None, script=None): + self.id = 'storyboard' self.experiment = experiment self.duration = duration - self.servers = list() + self.cur_time = 0.0 + self.server_apps = {} + self.client_apps = {} if servers is None: servers = list() for s in servers: self._validate_and_add_server(s) self.client_nodes = set() self.server_nodes = set() - self.active_clients = [] + self.process_dict = {} + self.active_clients = self.process_dict.values() self.start_time = None self.commands_list = {} + self.script = script + self.node_map = {} + self._build_nodes_lists() def _build_nodes_lists(self): """Populates server_nodes and client_nodes lists""" - for server in self.servers: + for server in self.server_apps.values(): for node in server.nodes: self.server_nodes.add(node) for client in server.clients: for node in client.nodes: self.client_nodes.add(node) + if self.experiment is not None: + for node in self.experiment.nodes: + self.node_map[node.name] = node def _validate_and_add_server(self, s, n=None): if len(s.clients) == 0: logger.warning("'%s' server has no registered clients.", s.ap) + else: + for client in s.clients: + self.client_apps[client.id] = client if n is not None: s.add_node(n) - self.servers.append(s) else: if len(s.nodes) == 0: logger.warning("'%s' server has no registered nodes.", s.ap) - self.servers.append(s) for node in s.nodes: if node not in self.experiment.nodes: raise ValueError('Cannot run server on node %s, ' 'not in experiment.' % (node.name,)) + self.server_apps[s.id] = s + self._build_nodes_lists() def set_experiment(self, experiment): if not isinstance(experiment, model.Experiment): raise TypeError('Experiment instance required.') self.experiment = experiment + self._build_nodes_lists() def add_server(self, server): """Register a server node to the sb.""" @@ -335,74 +394,212 @@ class StoryBoard: if not isinstance(server, Server): raise TypeError('First argument must be of type Server') if not isinstance(node, model.Node): - raise TypeError('Second argument must be of type Server') + raise TypeError('Second argument must be of type Node') self._validate_and_add_server(server, node) def del_server(self, server): - self.servers.remove(server) + del self.server_apps[server.id] + self._build_nodes_lists() - def run_command(self, t, node, command): + def schedule_command(self, t, node, command): """ - Schedule the given command to be run at t seconds from the start. + Schedules the given command to be run at t seconds from the start. The commands are run in no particular order, so take care - :param t: (float) seconds to wait before running the command - :param node: (Node) the node on which the command should be run - :param command: (str or list[str]) the command(s) to be run, + @param t: (float) seconds to wait before running the command + @param node: (Node or str) the node on which the command should be run + @param command: (str or list[str]) the command(s) to be run, """ if self.experiment is None: - raise ValueError("Cannot add a server before " - "setting the experiment.") + raise ValueError("An experiment is needed to schedul commands.") + if self.script is None: + self.script = Script(self) + if isinstance(node, str): + node = self.node_map[node] if node not in self.experiment.nodes: raise ValueError('Cannot run command on node %s, ' 'not in experiment.' % (node.name,)) if isinstance(command, str): - self.commands_list.setdefault(t, []).append((node, command)) - else: # Hope it's an Iterable[str]. Otherwise, errors will happen. - for cmd in command: - self.commands_list.setdefault(t, []).append((node, cmd)) - - def periodic_check(self): - # Spawn new clients - for server in self.servers: - clients = server.get_new_clients(self.DEFAULT_INTERVAL) - for new_client in clients: - new_client.duration = min( - new_client.duration, - self.duration - (time.time() - self.start_time) - ) - # Make sure the duration of the client does not - # go beyond the storyboard lifetime - if new_client.duration < server.min_duration: - continue - # Do not start clients that would not run for - # at least the minimum duration - # (due to sb constraints) - new_client.run() - self.active_clients.append(new_client) - - # Kill expired clients - surviving = [] - - for x in self.active_clients: - if x.kill_check(): # - surviving.append(x) - self.active_clients = surviving - - # Do run_command instructions - unexpired_commands = {} - for t in self.commands_list: - if (time.time() - self.start_time) > t: - for node, command in self.commands_list[t]: - node.execute_command(command) - else: - unexpired_commands[t] = self.commands_list[t] - self.commands_list = unexpired_commands + command = [command] + action = functools.partial(self.run_command, node, command) + self.script.add_event(Event(action, ev_time=t)) + + def run_command(self, node, command): + """ + Runs a command (or several) on a given node, immediately. + + @param node: (Node or str) the node on which the command should be run + @param command: (str or list[str]) the command(s) to be run + """ + if self.experiment is None: + raise ValueError("Experiment needed to run commands.") + if isinstance(node, str): + node = self.node_map[node] + if node not in self.experiment.nodes: + raise ValueError('Cannot run command on node %s, ' + 'not in experiment.' % (node.name,)) + if isinstance(command, str): + command = [command] + node.execute_commands(command) + + def run_client_of(self, server, duration=None, node=None, proc_id=None): + """ + Runs a random client of the specified server + with the specified parameters. + + Except for the server, if a parameter is not specified, + it will be randomly generated according to the server + parameters (mean duration, client apps and their nodes) + + @param server: the server of which one client should be run + @param duration: the duration of the client process + @param node: (Node or str) the node on which the client should be run + @param proc_id: the entity ID to use for the process + """ + if isinstance(server, str): + server = self.server_apps[server] + if duration is None: + duration = server.get_duration() + client = random.choice(server.clients) + self.run_client(client, duration, node, proc_id) + + def run_client(self, client, duration, node=None, proc_id=None): + """ + Runs the specified client app with the specified parameters. + + If the node parameter is not given, it will be chosen at random + among the client default nodes. + + @param client: the client which should be run + @param duration: the duration of the client process + @param node: (Node or str) the node on which the client should be run + @param proc_id: the entity ID to use for the process + """ + if isinstance(client, str): + client = self.client_apps[client] + if node is None: + if len(client.nodes) == 0: + raise ValueError('No nodes registered for client %s', + client.id) + node = random.choice(client.nodes) + elif isinstance(node, str): + node = self.node_map[node] + process = client.process(duration, node, proc_id) + self.process_dict[process.id] = process + process.run() + action = functools.partial(self.kill_process, process.id) + self.script.add_event(Event(action, ev_time=(self.cur_time + duration))) + + def start_client_of(self, server, duration=None, node=None, proc_id=None): + """ + Starts a random client of the specified server + with the specified parameters. + + Except for the server and the duration, if a parameter + is not specified, it will be randomly generated according + to the server parameters (client apps and their nodes). + + If the client app must be shutdown manually or the duration + parameter is None, the client process will _not_ be stopped + automatically, and will continue running unless otherwise + killed. + + @param server: the server of which one client should be run + @param duration: the duration of the client process + @param node: the node on which the client should be run + @param proc_id: the entity ID to use for the process + """ + if isinstance(server, str): + server = self.server_apps[server] + client = random.choice(server.clients) + self.start_client(client, duration, node, proc_id) + + def start_client(self, client, duration=None, node=None, proc_id=None): + """ + Starts the specified client app with the specified parameters. + + If the node parameter is not given, it will be chosen at random + among the client default nodes. + + If the app must be shutdown manually or the duration + parameter is None, it will _not_ be stopped + automatically, and will continue running unless otherwise + killed. + + @param client: the client which should be run + @param duration: the duration of the client process + @param node: the node on which the client should be run + @param proc_id: the entity ID to use for the process + """ + if isinstance(client, str): + client = self.client_apps[client] + if node is None: + node = random.choice(client.nodes) + process = client.process(duration, node, proc_id) + self.process_dict[process.id] = process + process.run() + + def kill_process(self, proc_id): + process = self.process_dict.get(proc_id, None) + if process is None: + raise ValueError('No process named %s' % (proc_id,)) + process.stop() + del self.process_dict[proc_id] + + def periodic_check(self, t): + self.script.check_for_ready_ev(t) + self.script.run_ready() + + def parse_script(self, buffer): + self.script = Script(self) + self.script.parse(buffer) + + def parse_script_file(self, filename): + self.script = Script(self) + self.script.parse_file(filename) + + def generate_script(self): + if self.experiment is None: + raise ValueError('Cannot generate script without an experiment') + self.script = Script(self) + t = self.SCRIPT_RESOLUTION + marker = 5 + last_marker = 0 + while t < self.duration: + if int(t) >= (last_marker+1)*marker: + last_marker += 1 + logger.debug('Passed the %s seconds mark', last_marker*marker) + for server in self.server_apps.values(): + c_l = server.get_new_clients(self.SCRIPT_RESOLUTION) + for d, n, p, c in c_l: + if d > self.duration - t: # would outlast the experiment + continue + start = self._make_process_events( + t, + d, + n, + p, + c + ) + self.script.add_event(start) + t += self.SCRIPT_RESOLUTION + + def _make_process_events(self, t, d, n, p, c): + start_action = functools.partial( + self.run_client, + c, + d, + n, + p + ) + start_event = Event(start_action, ev_time=t) + return start_event def start(self): if self.experiment is None: - raise ValueError("Cannot add a server before " - "setting the experiment.") + raise ValueError("Cannot run sb with no experiment.") + if self.script is None: + self.generate_script() logger.info('Starting storyboard execution') self._build_nodes_lists() logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes]) @@ -420,18 +617,24 @@ class StoryBoard: "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) try: - for server in self.servers: + for server in self.server_apps.values(): server.run() - while time.time() - self.start_time < self.duration: - self.periodic_check() - time.sleep(self.DEFAULT_INTERVAL) - self.periodic_check() # Do things that were scheduled - # in the last "INTERVAL" seconds - # of the StoryBoard + res = self.SCRIPT_RESOLUTION # for brevity + while self.cur_time < self.duration: + self.periodic_check(self.cur_time) + next_breakpoint = math.ceil(self.cur_time / res) * res + delta = next_breakpoint - self.cur_time + if delta > 0: # just in case + time.sleep(delta) + self.cur_time = float(time.time() - self.start_time) + self.periodic_check(self.cur_time) + # Do things that were scheduled + # in the last seconds + # of the StoryBoard finally: # Kill everything. No more mercy. for client in self.active_clients: client.stop() - for server in self.servers: + for server in self.server_apps.values(): server.stop() def fetch_logs(self, local_dir=None): @@ -452,3 +655,363 @@ class StoryBoard: logs_list = [x for x in logs_list.split('\n') if x != ''] logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, local_dir) + + +class Event(object): + + cur_id = -1 + + @classmethod + def generate_id(cls): + cls.cur_id += 1 + return "__event_%s" % (cls.cur_id,) + + def __init__(self, action, ev_id=None, ev_time=None, trigger=None): + """ + @param ev_id: (str) id of the event + @param action: (any SBEntity method) action to undertake + when event is activated + @param ev_time: (float) seconds interval to wait before running the event + @param trigger: (Event) Event which must complete before this runs + """ + if ev_time is None and trigger is None: + raise ValueError('No condition specified for event %s.' % + ev_id) + self.id = ev_id if ev_id is not None else self.generate_id() + self.action = action + self.time = ev_time + self._trigger = trigger + self.trigger = trigger.id if trigger is not None else None + self.exception = None + self.done = False + self._repr = "%(prefix)s &%(label)s | %(entity)s %(method)s" % { + 'prefix': self._prefix_repr(), + 'label': self.id, + 'entity': '$' + self.action.func.__self__.get_e_id(), + 'method': self._action_repr() + } + + def _action_repr(self): + if isinstance(self.action, functools.partial): + name = self.action.func.__name__ + args = ' '.join(self._action_arg_repr(x) for x in self.action.args) + else: + name = self.action.__name__ + args = '' + return ' '.join((name, args)) + + @staticmethod + def _action_arg_repr(arg): + if hasattr(arg, 'get_e_id'): + return '$' + arg.get_e_id() + elif isinstance(arg, str): + return "'" + arg + "'" + elif isinstance(arg, float): + return "%.2f" % (arg,) + else: + return str(arg) # Assuming int + + def _prefix_repr(self): + conditions = [] + if self.time is not None: + conditions.append("%.2f" % (self.time,)) + if self.trigger is not None: + conditions.append(self.trigger) + return ", ".join(conditions) + + @property + def failed(self): + return self.exception is not None + + def pre_exec(self): # hook to be overridden + pass + + def post_exec(self): # hook to be overridden + pass + + def _start(self): + self.pre_exec() + if self.done: + raise ValueError('Event %s has already ran' % self.id) + + def run(self): + """ + Run this event's action + """ + self._start() + try: + self.action() + except Exception as e: + self.exception = e + self._done() + + def check(self, cur_time): + """ + Check if this event can be run. + @param cur_time: (float) current time + @return: True if the preconditions are satisfied, False otherwise. + """ + return \ + (self.time is None or cur_time > self.time) \ + and (self._trigger is None or self._trigger.done) + + def _done(self): + self.done = True + if self.exception is not None: + logger.warning('Event %s failed. %s: %s.', + self.id, + type(self.exception).__name__, + str(self.exception)) + self.post_exec() + + def __str__(self): + return self.id + + def __repr__(self): + return self._repr + + +class Script(object): + + def __init__(self, storyboard): + # Brute force 'dump all in memory' approach to avoid + # iterating through the events a lot of times + # at runtime + self.events_by_id = {} + self._waiting_events = {} + self._events_ready = [] + + self._nodes = {} + self._servers = {} + self._clients = {} + self._testbed = None + self._experiment = None + self._storyboard = None + self._entities = {} + self._parse_entities(storyboard) + + def _parse_entities(self, storyboard): + self._storyboard = storyboard + self._experiment = self._storyboard.experiment + self._nodes = self._storyboard.node_map + self._testbed = self._experiment.testbed + self._servers = self._storyboard.server_apps + self._clients = self._storyboard.client_apps + self._processes = {} + self._entities = { + 'sb': self._storyboard, + 'storyboard': self._storyboard, + 'testbed': self._testbed, + 'experiment': self._experiment, + 'Node': self._nodes, + 'Server': self._servers, + 'Client': self._clients, + 'ClientProcess': self._processes + } + + def add_process(self, process): + self._processes[process.id] = process + + def add_event(self, event): + """ + Add an event to this script + @param event: (Event or str) the event to add + """ + if isinstance(event, str): + self._parse_line(event) + return + ev_id = event.id + self.events_by_id[ev_id] = event + self._waiting_events[ev_id] = event + + def del_event(self, event): + """ + Remove an event from this script + @param event: (Event or str) the event (or id thereof) to remove + """ + if isinstance(event, Event): + event = event.id + del self.events_by_id[event] + del self._events_ready[event] + + def check_for_ready_ev(self, cur_time): + """ + Check which events are ready to run, and mark them + @param (float) current time + """ + new_wait = {} + for i, e in self._waiting_events.items(): + if e.check(cur_time): + self._events_ready.append(e) + else: + new_wait[i] = e + self._waiting_events = new_wait + + def run_ready(self): + """ + Run events marked as ready by a previous inspection + + (see @check_for_ready_ev) + """ + while len(self._events_ready): + event = self._events_ready.pop() + logger.debug("Running event %s.", event.id) + event.run() + + def _nested_iterator(self, d): + for t in ((x, self.events_by_id[x]) + for e_lst in d.values() for x in e_lst): + yield t + + def _resolve_entity(self, e_id): + e_key = e_id.split('.') + result = self._entities + while len(e_key) != 0: + key = e_key.pop(0) + result = result.get(key) + if result is None: + raise ValueError('Invalid entity key %s at %s' + % (e_id, key)) + return result + + def _parse_action(self, entity, action_l): + method_n = action_l[0] + method = getattr(entity, method_n, None) + if method is None: + raise ValueError('No method called "%s" for entity %s.' + % (method_n, entity.get_e_id())) + args_l = action_l[1:] + args = self._parse_action_arguments(args_l) + return functools.partial(method, *args) + # TODO maybe some argument checking? + + def _parse_action_arguments(self, args_l): + args = [] + quotes_part = None + while len(args_l) > 0: + arg_s = args_l.pop(0) + if quotes_part is not None: + if arg_s.endswith("'"): + arg_s = arg_s[:-1] # Strip final ' + quotes_part += ' ' + arg_s + args.append(quotes_part) + quotes_part = None + elif arg_s.startswith("'"): + arg_s = arg_s[1:] # Strip initial ' + if arg_s.endswith("'"): + arg_s = arg_s[:-1] # Strip final ' + args.append(arg_s) + else: + quotes_part = arg_s + else: + if arg_s.startswith('$'): + args.append(self._resolve_entity(arg_s[1:])) + else: + try: + args.append(float(arg_s)) + except ValueError: + raise ValueError('Syntax error: %s is not a float. ' + 'If it is supposed to be a string, ' + 'enclose it in single quotes.' + % (arg_s,)) + return args + + def _parse_prefix(self, prefix): + prefix = prefix.strip().split('&') + if len(prefix) > 2: + raise ValueError('Syntax error: multiple "&" in prefix') + conditions = prefix[0].strip().split(',') + if len(prefix) == 2: + label = prefix[1].strip() + else: + label = None + if len(conditions) == 0: + raise ValueError("Syntax error: expected at least one condition") + elif len(conditions) > 2: + raise ValueError("Syntax error: expected at most two condition") + t, trigger = self._parse_conditions(*[x.strip() for x in conditions]) + return label, t, trigger + + def _parse_suffix(self, suffix): + action_l = suffix.strip().split(' ') + if action_l[0].startswith('$'): + entity_id = action_l[0][1:] + action_l = action_l[1:] + if len(action_l) == 0: + raise ValueError('Syntax error: missing action.') + else: + entity_id = 'sb' + entity = self._resolve_entity(entity_id) + action = self._parse_action(entity, action_l) + return action + + def _parse_line(self, line): + parts = line.split('|') + if len(parts) != 2: + raise ValueError("Syntax error: expected exactly one '|'") + label, t, trigger = self._parse_prefix(parts[0]) + action = self._parse_suffix(parts[1]) + event = Event(action, ev_id=label, ev_time=t, trigger=trigger) + self.add_event(event) + + def parse(self, str_iterable): + for index, line in enumerate(str_iterable): + if line.startswith('#'): + continue + if line.strip('\n').strip(' ') == '': + continue + try: + self._parse_line(line) + except ValueError as e: + raise ValueError(str(e) + ' -> @ line %s' % (index,)) + + def parse_file(self, filename): + with open(filename, 'r') as f: + self.parse(f) + + def parse_string(self, s): + buffer = StringIO(s) + self.parse(buffer) + + def write(self, buffer): + ev_list = list(self.events_by_id.values()) + ev_list.sort(key=lambda x: x.time if x.time is not None else float('+inf')) + for event in ev_list: + buffer.write(repr(event) + '\n') + + def write_to_file(self, filename): + with open(filename, 'w') as f: + self.write(f) + + def write_string(self): + s = StringIO() + self.write(s) + return s + + def _parse_conditions(self, *conditions): + """ + Parses condition strings and returns the conditions + @param conditions: list of strings + @return: (Tuple[float, event]) -> (time, trigger) + """ + t, trigger = None, None + for cond in conditions: + if t is None: + try: + temp = float(cond) + if temp > 24 * 3600: # seconds in a day + raise ValueError + t = temp + continue + except ValueError: + pass + if trigger is None: + try: + trigger = self.events_by_id[cond] + continue + except KeyError: + pass + raise ValueError('Syntax error: cannot parse condition {}. ' + 'Either the condition is malformed, or there are ' + 'multiple triggers of the same type.') + return t, trigger -- cgit v1.2.3