aboutsummaryrefslogtreecommitdiff
path: root/rumba/storyboard.py
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2018-02-22 10:18:10 +0100
committerMarco Capitani <m.capitani@nextworks.it>2018-03-16 14:07:24 +0100
commite174aaf3650c23331c757921b1af9b152f53c6e5 (patch)
tree281a859419b20e34605e310c9572668d6f545734 /rumba/storyboard.py
parentade6bd4cda44c88b555f521641c6e01326ab0060 (diff)
downloadrumba-e174aaf3650c23331c757921b1af9b152f53c6e5.tar.gz
rumba-e174aaf3650c23331c757921b1af9b152f53c6e5.zip
storyboard: add replayability
implements #27
Diffstat (limited to 'rumba/storyboard.py')
-rw-r--r--rumba/storyboard.py759
1 files changed, 661 insertions, 98 deletions
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index ca0cfb5..fa6413d 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -29,6 +29,8 @@
# @ap: Application Process binary
# @options: Options to pass to the binary
#
+import functools
+import math
import os
import random
import time
@@ -37,6 +39,11 @@ import rumba.model as model
import rumba.ssh_support as ssh_support
import rumba.log as log
+try:
+ from io import StringIO
+except ImportError:
+ import StringIO # Python 2 here
+
logger = log.get_logger(__name__)
try:
@@ -53,18 +60,29 @@ except ImportError:
# PROBLEM! These logs will almost never be printed...
# But we might not care
-current_id = -1
+
+class SBEntity(object):
+
+ def __init__(self, e_id):
+ self.id = e_id
+
+ def get_e_id(self):
+ return type(self).__name__ + '.' + self.id
-def get_id():
- global current_id
- current_id += 1
- return current_id
+class Client(SBEntity):
+ current_id = -1
-class Client(object):
- def __init__(self, ap, nodes=None, options=None, shutdown="kill <pid>"):
+ @classmethod
+ def get_id(cls):
+ cls.current_id += 1
+ return cls.current_id
+
+ def __init__(self, ap, nodes=None, options=None, shutdown="kill <pid>", c_id=None):
self.ap = ap
+ e_id = c_id if c_id is not None else self.ap
+ super(Client, self).__init__(e_id)
self.startup = (ap + ((" " + options) if options is not None else ""))
if isinstance(nodes, model.Node):
nodes = [nodes]
@@ -78,10 +96,16 @@ class Client(object):
raise Exception("A Node is required.")
self.nodes.append(node)
- def process(self, duration):
- node = random.choice(self.nodes) if len(self.nodes) > 0 else None
+ def process(self, duration, node=None, proc_id=None):
+ if proc_id is None:
+ proc_id = "%s_%s" % (self.ap, self.get_id())
+ if node is None:
+ if len(self.nodes) == 0:
+ raise ValueError('No nodes for client %s'
+ % (self.id,))
+ node = random.choice(self.nodes)
return ClientProcess(
- get_id(),
+ proc_id,
self.ap,
self.startup,
duration,
@@ -97,13 +121,13 @@ class Client(object):
# @start_time: The time at which this process is started.
# @options: Options to pass to the binary
#
-class ClientProcess(object):
- def __init__(self, client_id, ap, startup, duration,
- node, shutdown="<kill <pid>"):
- self.id = client_id
+class ClientProcess(SBEntity):
+ def __init__(self, proc_id, ap, startup, duration,
+ node, shutdown="kill <pid>"):
+ super(ClientProcess, self).__init__(proc_id)
self.ap = ap
self.startup = startup
- self.duration = duration
+ self.duration = duration if duration is not None else -1
self.start_time = None
self.running = False
self.node = node
@@ -120,8 +144,7 @@ class ClientProcess(object):
self.ap, self.node.name, self.duration
)
- start_cmd = "./startup.sh %s_%s %s" % (
- self.ap,
+ start_cmd = "./startup.sh %s %s" % (
self.id,
self.startup.replace("<duration>", str(self.duration)),
)
@@ -152,7 +175,7 @@ class ClientProcess(object):
self.ap, self.node.name
)
- def kill_check(self):
+ def check_and_kill(self):
"""Check if the process should keep running, stop it if not,
and return true if and only if it is still running."""
now = time.time()
@@ -175,11 +198,13 @@ class ClientProcess(object):
# @clients: Client binaries that will use this server
# @nodes: Specific nodes to start this server on
#
-class Server:
+class Server(SBEntity):
def __init__(self, ap, arrival_rate, mean_duration,
options=None, max_clients=float('inf'),
- clients=None, nodes=None, min_duration=2):
+ clients=None, nodes=None, min_duration=2, s_id=None):
self.ap = ap
+ e_id = s_id if s_id is not None else self.ap
+ super(Server, self).__init__(e_id)
self.options = options if options is not None else ""
self.max_clients = max_clients
if clients is None:
@@ -216,15 +241,31 @@ class Server:
"""
number = poisson(self.arrival_rate * interval)
number = int(min(number, self.max_clients))
- return [self.make_client_process() for _ in range(number)]
+ return [self._make_process_arguments() for _ in range(number)]
- def make_client_process(self):
- """Returns a client of this server"""
+ def get_duration(self):
+ return exponential(self.actual_parameter) + self.min_duration
+
+ def _make_process_arguments(self, duration=None, node=None, proc_id=None, client=None):
if len(self.clients) == 0:
raise Exception("Server %s has empty client list." % (self,))
- duration = exponential(self.actual_parameter) + self.min_duration
- return random.choice(self.clients).process(
- duration=float("%.2f" % (duration,))
+ if duration is None:
+ duration = self.get_duration()
+ if client is None:
+ client = random.choice(self.clients)
+ if node is None:
+ node = random.choice(client.nodes)
+ if proc_id is None:
+ proc_id = "%s_%s" % (client.ap, client.get_id())
+ return duration, node, proc_id, client
+
+ def make_client_process(self, duration=None, node=None, proc_id=None, client=None):
+ """Returns a client of this server"""
+ (d, n, p, c) = self._make_process_arguments(duration, node, proc_id, client)
+ return c.process(
+ duration=float("%.2f" % (d,)),
+ node=n,
+ proc_id=p
)
def run(self):
@@ -239,26 +280,26 @@ class Server:
cmd_2 = "./startup.sh %s" % (run_cmd,)
logger.debug(
'Starting server %s on node %s with logfile %s.',
- self.ap, node.name, logfile
+ self.id, node.name, logfile
)
try:
node.execute_command(cmd_1)
self.pids[node] = (node.execute_command(cmd_2))
except ssh_support.SSHException:
logger.warn('Could not start server %s on node %s.',
- self.ap, node.name)
+ self.id, node.name)
def stop(self):
for node, pid in self.pids.items():
logger.debug(
'Killing server %s on node %s.',
- self.ap, node.name
+ self.id, node.name
)
try:
node.execute_command("kill %s" % pid)
except ssh_support.SSHException:
logger.warn('Could not kill server %s on node %s.',
- self.ap, node.name)
+ self.id, node.name)
# Base class for ARCFIRE storyboards
@@ -268,52 +309,70 @@ class Server:
# @servers: App servers available in the network.
# Type == Server or Type == List[Tuple[Server, Node]]
#
-class StoryBoard:
+class StoryBoard(SBEntity):
- DEFAULT_INTERVAL = 2.5 # in seconds (may be a float)
+ SCRIPT_RESOLUTION = 0.1
+ EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float)
- def __init__(self, duration, experiment=None, servers=None):
+ def get_e_id(self):
+ return 'storyboard'
+
+ def __init__(self, duration, experiment=None, servers=None, script=None):
+ self.id = 'storyboard'
self.experiment = experiment
self.duration = duration
- self.servers = list()
+ self.cur_time = 0.0
+ self.server_apps = {}
+ self.client_apps = {}
if servers is None:
servers = list()
for s in servers:
self._validate_and_add_server(s)
self.client_nodes = set()
self.server_nodes = set()
- self.active_clients = []
+ self.process_dict = {}
+ self.active_clients = self.process_dict.values()
self.start_time = None
self.commands_list = {}
+ self.script = script
+ self.node_map = {}
+ self._build_nodes_lists()
def _build_nodes_lists(self):
"""Populates server_nodes and client_nodes lists"""
- for server in self.servers:
+ for server in self.server_apps.values():
for node in server.nodes:
self.server_nodes.add(node)
for client in server.clients:
for node in client.nodes:
self.client_nodes.add(node)
+ if self.experiment is not None:
+ for node in self.experiment.nodes:
+ self.node_map[node.name] = node
def _validate_and_add_server(self, s, n=None):
if len(s.clients) == 0:
logger.warning("'%s' server has no registered clients.", s.ap)
+ else:
+ for client in s.clients:
+ self.client_apps[client.id] = client
if n is not None:
s.add_node(n)
- self.servers.append(s)
else:
if len(s.nodes) == 0:
logger.warning("'%s' server has no registered nodes.", s.ap)
- self.servers.append(s)
for node in s.nodes:
if node not in self.experiment.nodes:
raise ValueError('Cannot run server on node %s, '
'not in experiment.' % (node.name,))
+ self.server_apps[s.id] = s
+ self._build_nodes_lists()
def set_experiment(self, experiment):
if not isinstance(experiment, model.Experiment):
raise TypeError('Experiment instance required.')
self.experiment = experiment
+ self._build_nodes_lists()
def add_server(self, server):
"""Register a server node to the sb."""
@@ -335,74 +394,212 @@ class StoryBoard:
if not isinstance(server, Server):
raise TypeError('First argument must be of type Server')
if not isinstance(node, model.Node):
- raise TypeError('Second argument must be of type Server')
+ raise TypeError('Second argument must be of type Node')
self._validate_and_add_server(server, node)
def del_server(self, server):
- self.servers.remove(server)
+ del self.server_apps[server.id]
+ self._build_nodes_lists()
- def run_command(self, t, node, command):
+ def schedule_command(self, t, node, command):
"""
- Schedule the given command to be run at t seconds from the start.
+ Schedules the given command to be run at t seconds from the start.
The commands are run in no particular order, so take care
- :param t: (float) seconds to wait before running the command
- :param node: (Node) the node on which the command should be run
- :param command: (str or list[str]) the command(s) to be run,
+ @param t: (float) seconds to wait before running the command
+ @param node: (Node or str) the node on which the command should be run
+ @param command: (str or list[str]) the command(s) to be run,
"""
if self.experiment is None:
- raise ValueError("Cannot add a server before "
- "setting the experiment.")
+ raise ValueError("An experiment is needed to schedul commands.")
+ if self.script is None:
+ self.script = Script(self)
+ if isinstance(node, str):
+ node = self.node_map[node]
if node not in self.experiment.nodes:
raise ValueError('Cannot run command on node %s, '
'not in experiment.' % (node.name,))
if isinstance(command, str):
- self.commands_list.setdefault(t, []).append((node, command))
- else: # Hope it's an Iterable[str]. Otherwise, errors will happen.
- for cmd in command:
- self.commands_list.setdefault(t, []).append((node, cmd))
-
- def periodic_check(self):
- # Spawn new clients
- for server in self.servers:
- clients = server.get_new_clients(self.DEFAULT_INTERVAL)
- for new_client in clients:
- new_client.duration = min(
- new_client.duration,
- self.duration - (time.time() - self.start_time)
- )
- # Make sure the duration of the client does not
- # go beyond the storyboard lifetime
- if new_client.duration < server.min_duration:
- continue
- # Do not start clients that would not run for
- # at least the minimum duration
- # (due to sb constraints)
- new_client.run()
- self.active_clients.append(new_client)
-
- # Kill expired clients
- surviving = []
-
- for x in self.active_clients:
- if x.kill_check(): #
- surviving.append(x)
- self.active_clients = surviving
-
- # Do run_command instructions
- unexpired_commands = {}
- for t in self.commands_list:
- if (time.time() - self.start_time) > t:
- for node, command in self.commands_list[t]:
- node.execute_command(command)
- else:
- unexpired_commands[t] = self.commands_list[t]
- self.commands_list = unexpired_commands
+ command = [command]
+ action = functools.partial(self.run_command, node, command)
+ self.script.add_event(Event(action, ev_time=t))
+
+ def run_command(self, node, command):
+ """
+ Runs a command (or several) on a given node, immediately.
+
+ @param node: (Node or str) the node on which the command should be run
+ @param command: (str or list[str]) the command(s) to be run
+ """
+ if self.experiment is None:
+ raise ValueError("Experiment needed to run commands.")
+ if isinstance(node, str):
+ node = self.node_map[node]
+ if node not in self.experiment.nodes:
+ raise ValueError('Cannot run command on node %s, '
+ 'not in experiment.' % (node.name,))
+ if isinstance(command, str):
+ command = [command]
+ node.execute_commands(command)
+
+ def run_client_of(self, server, duration=None, node=None, proc_id=None):
+ """
+ Runs a random client of the specified server
+ with the specified parameters.
+
+ Except for the server, if a parameter is not specified,
+ it will be randomly generated according to the server
+ parameters (mean duration, client apps and their nodes)
+
+ @param server: the server of which one client should be run
+ @param duration: the duration of the client process
+ @param node: (Node or str) the node on which the client should be run
+ @param proc_id: the entity ID to use for the process
+ """
+ if isinstance(server, str):
+ server = self.server_apps[server]
+ if duration is None:
+ duration = server.get_duration()
+ client = random.choice(server.clients)
+ self.run_client(client, duration, node, proc_id)
+
+ def run_client(self, client, duration, node=None, proc_id=None):
+ """
+ Runs the specified client app with the specified parameters.
+
+ If the node parameter is not given, it will be chosen at random
+ among the client default nodes.
+
+ @param client: the client which should be run
+ @param duration: the duration of the client process
+ @param node: (Node or str) the node on which the client should be run
+ @param proc_id: the entity ID to use for the process
+ """
+ if isinstance(client, str):
+ client = self.client_apps[client]
+ if node is None:
+ if len(client.nodes) == 0:
+ raise ValueError('No nodes registered for client %s',
+ client.id)
+ node = random.choice(client.nodes)
+ elif isinstance(node, str):
+ node = self.node_map[node]
+ process = client.process(duration, node, proc_id)
+ self.process_dict[process.id] = process
+ process.run()
+ action = functools.partial(self.kill_process, process.id)
+ self.script.add_event(Event(action, ev_time=(self.cur_time + duration)))
+
+ def start_client_of(self, server, duration=None, node=None, proc_id=None):
+ """
+ Starts a random client of the specified server
+ with the specified parameters.
+
+ Except for the server and the duration, if a parameter
+ is not specified, it will be randomly generated according
+ to the server parameters (client apps and their nodes).
+
+ If the client app must be shutdown manually or the duration
+ parameter is None, the client process will _not_ be stopped
+ automatically, and will continue running unless otherwise
+ killed.
+
+ @param server: the server of which one client should be run
+ @param duration: the duration of the client process
+ @param node: the node on which the client should be run
+ @param proc_id: the entity ID to use for the process
+ """
+ if isinstance(server, str):
+ server = self.server_apps[server]
+ client = random.choice(server.clients)
+ self.start_client(client, duration, node, proc_id)
+
+ def start_client(self, client, duration=None, node=None, proc_id=None):
+ """
+ Starts the specified client app with the specified parameters.
+
+ If the node parameter is not given, it will be chosen at random
+ among the client default nodes.
+
+ If the app must be shutdown manually or the duration
+ parameter is None, it will _not_ be stopped
+ automatically, and will continue running unless otherwise
+ killed.
+
+ @param client: the client which should be run
+ @param duration: the duration of the client process
+ @param node: the node on which the client should be run
+ @param proc_id: the entity ID to use for the process
+ """
+ if isinstance(client, str):
+ client = self.client_apps[client]
+ if node is None:
+ node = random.choice(client.nodes)
+ process = client.process(duration, node, proc_id)
+ self.process_dict[process.id] = process
+ process.run()
+
+ def kill_process(self, proc_id):
+ process = self.process_dict.get(proc_id, None)
+ if process is None:
+ raise ValueError('No process named %s' % (proc_id,))
+ process.stop()
+ del self.process_dict[proc_id]
+
+ def periodic_check(self, t):
+ self.script.check_for_ready_ev(t)
+ self.script.run_ready()
+
+ def parse_script(self, buffer):
+ self.script = Script(self)
+ self.script.parse(buffer)
+
+ def parse_script_file(self, filename):
+ self.script = Script(self)
+ self.script.parse_file(filename)
+
+ def generate_script(self):
+ if self.experiment is None:
+ raise ValueError('Cannot generate script without an experiment')
+ self.script = Script(self)
+ t = self.SCRIPT_RESOLUTION
+ marker = 5
+ last_marker = 0
+ while t < self.duration:
+ if int(t) >= (last_marker+1)*marker:
+ last_marker += 1
+ logger.debug('Passed the %s seconds mark', last_marker*marker)
+ for server in self.server_apps.values():
+ c_l = server.get_new_clients(self.SCRIPT_RESOLUTION)
+ for d, n, p, c in c_l:
+ if d > self.duration - t: # would outlast the experiment
+ continue
+ start = self._make_process_events(
+ t,
+ d,
+ n,
+ p,
+ c
+ )
+ self.script.add_event(start)
+ t += self.SCRIPT_RESOLUTION
+
+ def _make_process_events(self, t, d, n, p, c):
+ start_action = functools.partial(
+ self.run_client,
+ c,
+ d,
+ n,
+ p
+ )
+ start_event = Event(start_action, ev_time=t)
+ return start_event
def start(self):
if self.experiment is None:
- raise ValueError("Cannot add a server before "
- "setting the experiment.")
+ raise ValueError("Cannot run sb with no experiment.")
+ if self.script is None:
+ self.generate_script()
logger.info('Starting storyboard execution')
self._build_nodes_lists()
logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes])
@@ -420,18 +617,24 @@ class StoryBoard:
"echo '%s' > startup.sh && chmod a+x startup.sh" % (script,)
)
try:
- for server in self.servers:
+ for server in self.server_apps.values():
server.run()
- while time.time() - self.start_time < self.duration:
- self.periodic_check()
- time.sleep(self.DEFAULT_INTERVAL)
- self.periodic_check() # Do things that were scheduled
- # in the last "INTERVAL" seconds
- # of the StoryBoard
+ res = self.SCRIPT_RESOLUTION # for brevity
+ while self.cur_time < self.duration:
+ self.periodic_check(self.cur_time)
+ next_breakpoint = math.ceil(self.cur_time / res) * res
+ delta = next_breakpoint - self.cur_time
+ if delta > 0: # just in case
+ time.sleep(delta)
+ self.cur_time = float(time.time() - self.start_time)
+ self.periodic_check(self.cur_time)
+ # Do things that were scheduled
+ # in the last seconds
+ # of the StoryBoard
finally: # Kill everything. No more mercy.
for client in self.active_clients:
client.stop()
- for server in self.servers:
+ for server in self.server_apps.values():
server.stop()
def fetch_logs(self, local_dir=None):
@@ -452,3 +655,363 @@ class StoryBoard:
logs_list = [x for x in logs_list.split('\n') if x != '']
logger.debug('Log list is:\n%s', logs_list)
node.fetch_files(logs_list, local_dir)
+
+
+class Event(object):
+
+ cur_id = -1
+
+ @classmethod
+ def generate_id(cls):
+ cls.cur_id += 1
+ return "__event_%s" % (cls.cur_id,)
+
+ def __init__(self, action, ev_id=None, ev_time=None, trigger=None):
+ """
+ @param ev_id: (str) id of the event
+ @param action: (any SBEntity method) action to undertake
+ when event is activated
+ @param ev_time: (float) seconds interval to wait before running the event
+ @param trigger: (Event) Event which must complete before this runs
+ """
+ if ev_time is None and trigger is None:
+ raise ValueError('No condition specified for event %s.' %
+ ev_id)
+ self.id = ev_id if ev_id is not None else self.generate_id()
+ self.action = action
+ self.time = ev_time
+ self._trigger = trigger
+ self.trigger = trigger.id if trigger is not None else None
+ self.exception = None
+ self.done = False
+ self._repr = "%(prefix)s &%(label)s | %(entity)s %(method)s" % {
+ 'prefix': self._prefix_repr(),
+ 'label': self.id,
+ 'entity': '$' + self.action.func.__self__.get_e_id(),
+ 'method': self._action_repr()
+ }
+
+ def _action_repr(self):
+ if isinstance(self.action, functools.partial):
+ name = self.action.func.__name__
+ args = ' '.join(self._action_arg_repr(x) for x in self.action.args)
+ else:
+ name = self.action.__name__
+ args = ''
+ return ' '.join((name, args))
+
+ @staticmethod
+ def _action_arg_repr(arg):
+ if hasattr(arg, 'get_e_id'):
+ return '$' + arg.get_e_id()
+ elif isinstance(arg, str):
+ return "'" + arg + "'"
+ elif isinstance(arg, float):
+ return "%.2f" % (arg,)
+ else:
+ return str(arg) # Assuming int
+
+ def _prefix_repr(self):
+ conditions = []
+ if self.time is not None:
+ conditions.append("%.2f" % (self.time,))
+ if self.trigger is not None:
+ conditions.append(self.trigger)
+ return ", ".join(conditions)
+
+ @property
+ def failed(self):
+ return self.exception is not None
+
+ def pre_exec(self): # hook to be overridden
+ pass
+
+ def post_exec(self): # hook to be overridden
+ pass
+
+ def _start(self):
+ self.pre_exec()
+ if self.done:
+ raise ValueError('Event %s has already ran' % self.id)
+
+ def run(self):
+ """
+ Run this event's action
+ """
+ self._start()
+ try:
+ self.action()
+ except Exception as e:
+ self.exception = e
+ self._done()
+
+ def check(self, cur_time):
+ """
+ Check if this event can be run.
+ @param cur_time: (float) current time
+ @return: True if the preconditions are satisfied, False otherwise.
+ """
+ return \
+ (self.time is None or cur_time > self.time) \
+ and (self._trigger is None or self._trigger.done)
+
+ def _done(self):
+ self.done = True
+ if self.exception is not None:
+ logger.warning('Event %s failed. %s: %s.',
+ self.id,
+ type(self.exception).__name__,
+ str(self.exception))
+ self.post_exec()
+
+ def __str__(self):
+ return self.id
+
+ def __repr__(self):
+ return self._repr
+
+
+class Script(object):
+
+ def __init__(self, storyboard):
+ # Brute force 'dump all in memory' approach to avoid
+ # iterating through the events a lot of times
+ # at runtime
+ self.events_by_id = {}
+ self._waiting_events = {}
+ self._events_ready = []
+
+ self._nodes = {}
+ self._servers = {}
+ self._clients = {}
+ self._testbed = None
+ self._experiment = None
+ self._storyboard = None
+ self._entities = {}
+ self._parse_entities(storyboard)
+
+ def _parse_entities(self, storyboard):
+ self._storyboard = storyboard
+ self._experiment = self._storyboard.experiment
+ self._nodes = self._storyboard.node_map
+ self._testbed = self._experiment.testbed
+ self._servers = self._storyboard.server_apps
+ self._clients = self._storyboard.client_apps
+ self._processes = {}
+ self._entities = {
+ 'sb': self._storyboard,
+ 'storyboard': self._storyboard,
+ 'testbed': self._testbed,
+ 'experiment': self._experiment,
+ 'Node': self._nodes,
+ 'Server': self._servers,
+ 'Client': self._clients,
+ 'ClientProcess': self._processes
+ }
+
+ def add_process(self, process):
+ self._processes[process.id] = process
+
+ def add_event(self, event):
+ """
+ Add an event to this script
+ @param event: (Event or str) the event to add
+ """
+ if isinstance(event, str):
+ self._parse_line(event)
+ return
+ ev_id = event.id
+ self.events_by_id[ev_id] = event
+ self._waiting_events[ev_id] = event
+
+ def del_event(self, event):
+ """
+ Remove an event from this script
+ @param event: (Event or str) the event (or id thereof) to remove
+ """
+ if isinstance(event, Event):
+ event = event.id
+ del self.events_by_id[event]
+ del self._events_ready[event]
+
+ def check_for_ready_ev(self, cur_time):
+ """
+ Check which events are ready to run, and mark them
+ @param (float) current time
+ """
+ new_wait = {}
+ for i, e in self._waiting_events.items():
+ if e.check(cur_time):
+ self._events_ready.append(e)
+ else:
+ new_wait[i] = e
+ self._waiting_events = new_wait
+
+ def run_ready(self):
+ """
+ Run events marked as ready by a previous inspection
+
+ (see @check_for_ready_ev)
+ """
+ while len(self._events_ready):
+ event = self._events_ready.pop()
+ logger.debug("Running event %s.", event.id)
+ event.run()
+
+ def _nested_iterator(self, d):
+ for t in ((x, self.events_by_id[x])
+ for e_lst in d.values() for x in e_lst):
+ yield t
+
+ def _resolve_entity(self, e_id):
+ e_key = e_id.split('.')
+ result = self._entities
+ while len(e_key) != 0:
+ key = e_key.pop(0)
+ result = result.get(key)
+ if result is None:
+ raise ValueError('Invalid entity key %s at %s'
+ % (e_id, key))
+ return result
+
+ def _parse_action(self, entity, action_l):
+ method_n = action_l[0]
+ method = getattr(entity, method_n, None)
+ if method is None:
+ raise ValueError('No method called "%s" for entity %s.'
+ % (method_n, entity.get_e_id()))
+ args_l = action_l[1:]
+ args = self._parse_action_arguments(args_l)
+ return functools.partial(method, *args)
+ # TODO maybe some argument checking?
+
+ def _parse_action_arguments(self, args_l):
+ args = []
+ quotes_part = None
+ while len(args_l) > 0:
+ arg_s = args_l.pop(0)
+ if quotes_part is not None:
+ if arg_s.endswith("'"):
+ arg_s = arg_s[:-1] # Strip final '
+ quotes_part += ' ' + arg_s
+ args.append(quotes_part)
+ quotes_part = None
+ elif arg_s.startswith("'"):
+ arg_s = arg_s[1:] # Strip initial '
+ if arg_s.endswith("'"):
+ arg_s = arg_s[:-1] # Strip final '
+ args.append(arg_s)
+ else:
+ quotes_part = arg_s
+ else:
+ if arg_s.startswith('$'):
+ args.append(self._resolve_entity(arg_s[1:]))
+ else:
+ try:
+ args.append(float(arg_s))
+ except ValueError:
+ raise ValueError('Syntax error: %s is not a float. '
+ 'If it is supposed to be a string, '
+ 'enclose it in single quotes.'
+ % (arg_s,))
+ return args
+
+ def _parse_prefix(self, prefix):
+ prefix = prefix.strip().split('&')
+ if len(prefix) > 2:
+ raise ValueError('Syntax error: multiple "&" in prefix')
+ conditions = prefix[0].strip().split(',')
+ if len(prefix) == 2:
+ label = prefix[1].strip()
+ else:
+ label = None
+ if len(conditions) == 0:
+ raise ValueError("Syntax error: expected at least one condition")
+ elif len(conditions) > 2:
+ raise ValueError("Syntax error: expected at most two condition")
+ t, trigger = self._parse_conditions(*[x.strip() for x in conditions])
+ return label, t, trigger
+
+ def _parse_suffix(self, suffix):
+ action_l = suffix.strip().split(' ')
+ if action_l[0].startswith('$'):
+ entity_id = action_l[0][1:]
+ action_l = action_l[1:]
+ if len(action_l) == 0:
+ raise ValueError('Syntax error: missing action.')
+ else:
+ entity_id = 'sb'
+ entity = self._resolve_entity(entity_id)
+ action = self._parse_action(entity, action_l)
+ return action
+
+ def _parse_line(self, line):
+ parts = line.split('|')
+ if len(parts) != 2:
+ raise ValueError("Syntax error: expected exactly one '|'")
+ label, t, trigger = self._parse_prefix(parts[0])
+ action = self._parse_suffix(parts[1])
+ event = Event(action, ev_id=label, ev_time=t, trigger=trigger)
+ self.add_event(event)
+
+ def parse(self, str_iterable):
+ for index, line in enumerate(str_iterable):
+ if line.startswith('#'):
+ continue
+ if line.strip('\n').strip(' ') == '':
+ continue
+ try:
+ self._parse_line(line)
+ except ValueError as e:
+ raise ValueError(str(e) + ' -> @ line %s' % (index,))
+
+ def parse_file(self, filename):
+ with open(filename, 'r') as f:
+ self.parse(f)
+
+ def parse_string(self, s):
+ buffer = StringIO(s)
+ self.parse(buffer)
+
+ def write(self, buffer):
+ ev_list = list(self.events_by_id.values())
+ ev_list.sort(key=lambda x: x.time if x.time is not None else float('+inf'))
+ for event in ev_list:
+ buffer.write(repr(event) + '\n')
+
+ def write_to_file(self, filename):
+ with open(filename, 'w') as f:
+ self.write(f)
+
+ def write_string(self):
+ s = StringIO()
+ self.write(s)
+ return s
+
+ def _parse_conditions(self, *conditions):
+ """
+ Parses condition strings and returns the conditions
+ @param conditions: list of strings
+ @return: (Tuple[float, event]) -> (time, trigger)
+ """
+ t, trigger = None, None
+ for cond in conditions:
+ if t is None:
+ try:
+ temp = float(cond)
+ if temp > 24 * 3600: # seconds in a day
+ raise ValueError
+ t = temp
+ continue
+ except ValueError:
+ pass
+ if trigger is None:
+ try:
+ trigger = self.events_by_id[cond]
+ continue
+ except KeyError:
+ pass
+ raise ValueError('Syntax error: cannot parse condition {}. '
+ 'Either the condition is malformed, or there are '
+ 'multiple triggers of the same type.')
+ return t, trigger