diff options
author | Marco Capitani <m.capitani@nextworks.it> | 2018-02-22 10:18:10 +0100 |
---|---|---|
committer | Marco Capitani <m.capitani@nextworks.it> | 2018-03-16 14:07:24 +0100 |
commit | e174aaf3650c23331c757921b1af9b152f53c6e5 (patch) | |
tree | 281a859419b20e34605e310c9572668d6f545734 | |
parent | ade6bd4cda44c88b555f521641c6e01326ab0060 (diff) | |
download | rumba-e174aaf3650c23331c757921b1af9b152f53c6e5.tar.gz rumba-e174aaf3650c23331c757921b1af9b152f53c6e5.zip |
storyboard: add replayability
implements #27
-rw-r--r-- | examples/example-script.rsb | 40 | ||||
-rwxr-xr-x | examples/script-example.py | 101 | ||||
-rwxr-xr-x | examples/vpn.py | 3 | ||||
-rw-r--r-- | rumba/log.py | 1 | ||||
-rw-r--r-- | rumba/model.py | 4 | ||||
-rw-r--r-- | rumba/storyboard.py | 759 | ||||
-rw-r--r-- | rumba/utils.py | 2 | ||||
-rw-r--r-- | tools/scriptgenerator.py | 88 |
8 files changed, 897 insertions, 101 deletions
diff --git a/examples/example-script.rsb b/examples/example-script.rsb new file mode 100644 index 0000000..8b5c714 --- /dev/null +++ b/examples/example-script.rsb @@ -0,0 +1,40 @@ +# This is a comment (no in-line comments) + +5.0 &ev1| $sb run_client_of 'server_a' +# 5.0 is the time trigger (after 5 seconds). Its a float +# $sb is the storyboard object +# run_client_of is the method to invoke + +10.3 &echo_cmd|$sb run_command 'node_a' 'echo "test"' +# methods can have multiple space-delimited arguments. +# single-quoted strings are used to enclose a single (string) argument +# which must be preceded and followed by a space. +# &echo_cmd is a label, it can be used to reference this +# event from other events + +10.5 &ev2|$sb run_client_of 'server_b' 12 'node_a' +# And optional arguments (same syntax as the method) +# also, whitespace tolerant around the | token + +# NOTE: in the absence of optional arguments, they will be randomly +# generated, so you might not get the exact same experiment every time + +13 &ev3| $Node.node_a execute_command 'echo "test"' +# $Node.<node_id> is a Node object. +# This command is actually equivalent to echo_cmd + +echo_cmd &echo2 | $sb run_command $Node.node_b 'echo "test2"' +# events can be triggered by the completion of other events + +echo2, 18 &ev4| $sb run_client_of $Server.server_b +# or both time AND other events +# Moreover, one can use entity syntax in arguments too + +1.2 &ev5| run_client_of $Server.server_c +# Events need _not_ be in temporal order +# if no object ($ handle) is provided, the storyboard +# is assumed as the object/ + + + + diff --git a/examples/script-example.py b/examples/script-example.py new file mode 100755 index 0000000..316a1d1 --- /dev/null +++ b/examples/script-example.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python + +# An example script leveraging the storyboard scripting functionality + +from rumba.storyboard import * +from rumba.model import * +import rumba.log as log +import rumba.testbeds.qemu as qemu +import rumba.prototypes.rlite as rl +from rumba.utils import ExperimentManager +import rumba.utils as utils + +log.set_logging_level(log.DEBUG) + + +n1 = NormalDIF("n1") + +n1.add_policy("rmt.pff", "lfa") +n1.add_policy("security-manager", "passwd") + +e1 = ShimEthDIF("e1") + +node_a = Node("node_a", + difs=[n1, e1], + dif_registrations={n1: [e1]}) + +node_b = Node("node_b", + difs=[e1, n1], + dif_registrations={n1: [e1]}) + +tb = qemu.Testbed(exp_name="script_test", + username="root", + password="root") + +exp = rl.Experiment(tb, nodes=[node_a, node_b]) + + +client_a = Client( + "rinaperf", + options="-t perf -s 1000 -D <duration>", + shutdown="", + c_id='rinaperf_a' +) + +client_b = Client( + "rinaperf", + options="-t perf -s 1000 -D <duration> -z rinaperfb", + shutdown="", + c_id='rinaperf_b' +) + +client_c = Client( + "rinaperf", + options="-t perf -s 1000 -D <duration> -z rinaperfc", + shutdown="", + c_id='rinaperf_c' +) + +server_a = Server( + "rinaperf", + options="-l", + arrival_rate=1, + mean_duration=5, + clients=[client_a], + s_id='server_a' +) + +server_b = Server( + "rinaperf", + options="-l -z rinaperfb", + arrival_rate=0.5, + mean_duration=10, + clients=[client_b], + s_id='server_b' +) + +server_c = Server( + "rinaperf", + options="-l -z rinaperfc", + arrival_rate=1.6, + mean_duration=3, + clients=[client_c], + s_id='server_c' +) + + +if __name__ == '__main__': + story = StoryBoard(30) + story.set_experiment(exp) + story.add_server_on_node(server_a, node_a) + story.add_server_on_node(server_b, node_a) + story.add_server_on_node(server_c, node_a) + client_a.add_node(node_b) + client_b.add_node(node_b) + client_c.add_node(node_b) + story.parse_script_file('example-script.rsb') + log.flush_log() + with ExperimentManager(exp): + exp.swap_in() + exp.bootstrap_prototype() + story.start() diff --git a/examples/vpn.py b/examples/vpn.py index b2f3c81..28de09e 100755 --- a/examples/vpn.py +++ b/examples/vpn.py @@ -44,7 +44,8 @@ d = Node("d", dif_registrations = {n1 : [e3], n2 : [n1]}) tb = qemu.Testbed(exp_name = 'example1', - username = 'sander') + username = 'root', + password = 'root') exp = our.Experiment(tb, nodes = [a, b, c, d]) diff --git a/rumba/log.py b/rumba/log.py index 6c9cd10..9abddce 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -27,7 +27,6 @@ import logging import logging.handlers import multiprocessing -import os import sys import time diff --git a/rumba/model.py b/rumba/model.py index 42112be..1f47861 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -220,6 +220,10 @@ class SSHConfig(object): # # class Node(object): + + def get_e_id(self): + return "Node." + self.name + def __init__(self, name, difs=None, dif_registrations=None, policies=None, machine_type=None): self.name = name diff --git a/rumba/storyboard.py b/rumba/storyboard.py index ca0cfb5..fa6413d 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -29,6 +29,8 @@ # @ap: Application Process binary # @options: Options to pass to the binary # +import functools +import math import os import random import time @@ -37,6 +39,11 @@ import rumba.model as model import rumba.ssh_support as ssh_support import rumba.log as log +try: + from io import StringIO +except ImportError: + import StringIO # Python 2 here + logger = log.get_logger(__name__) try: @@ -53,18 +60,29 @@ except ImportError: # PROBLEM! These logs will almost never be printed... # But we might not care -current_id = -1 + +class SBEntity(object): + + def __init__(self, e_id): + self.id = e_id + + def get_e_id(self): + return type(self).__name__ + '.' + self.id -def get_id(): - global current_id - current_id += 1 - return current_id +class Client(SBEntity): + current_id = -1 -class Client(object): - def __init__(self, ap, nodes=None, options=None, shutdown="kill <pid>"): + @classmethod + def get_id(cls): + cls.current_id += 1 + return cls.current_id + + def __init__(self, ap, nodes=None, options=None, shutdown="kill <pid>", c_id=None): self.ap = ap + e_id = c_id if c_id is not None else self.ap + super(Client, self).__init__(e_id) self.startup = (ap + ((" " + options) if options is not None else "")) if isinstance(nodes, model.Node): nodes = [nodes] @@ -78,10 +96,16 @@ class Client(object): raise Exception("A Node is required.") self.nodes.append(node) - def process(self, duration): - node = random.choice(self.nodes) if len(self.nodes) > 0 else None + def process(self, duration, node=None, proc_id=None): + if proc_id is None: + proc_id = "%s_%s" % (self.ap, self.get_id()) + if node is None: + if len(self.nodes) == 0: + raise ValueError('No nodes for client %s' + % (self.id,)) + node = random.choice(self.nodes) return ClientProcess( - get_id(), + proc_id, self.ap, self.startup, duration, @@ -97,13 +121,13 @@ class Client(object): # @start_time: The time at which this process is started. # @options: Options to pass to the binary # -class ClientProcess(object): - def __init__(self, client_id, ap, startup, duration, - node, shutdown="<kill <pid>"): - self.id = client_id +class ClientProcess(SBEntity): + def __init__(self, proc_id, ap, startup, duration, + node, shutdown="kill <pid>"): + super(ClientProcess, self).__init__(proc_id) self.ap = ap self.startup = startup - self.duration = duration + self.duration = duration if duration is not None else -1 self.start_time = None self.running = False self.node = node @@ -120,8 +144,7 @@ class ClientProcess(object): self.ap, self.node.name, self.duration ) - start_cmd = "./startup.sh %s_%s %s" % ( - self.ap, + start_cmd = "./startup.sh %s %s" % ( self.id, self.startup.replace("<duration>", str(self.duration)), ) @@ -152,7 +175,7 @@ class ClientProcess(object): self.ap, self.node.name ) - def kill_check(self): + def check_and_kill(self): """Check if the process should keep running, stop it if not, and return true if and only if it is still running.""" now = time.time() @@ -175,11 +198,13 @@ class ClientProcess(object): # @clients: Client binaries that will use this server # @nodes: Specific nodes to start this server on # -class Server: +class Server(SBEntity): def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), - clients=None, nodes=None, min_duration=2): + clients=None, nodes=None, min_duration=2, s_id=None): self.ap = ap + e_id = s_id if s_id is not None else self.ap + super(Server, self).__init__(e_id) self.options = options if options is not None else "" self.max_clients = max_clients if clients is None: @@ -216,15 +241,31 @@ class Server: """ number = poisson(self.arrival_rate * interval) number = int(min(number, self.max_clients)) - return [self.make_client_process() for _ in range(number)] + return [self._make_process_arguments() for _ in range(number)] - def make_client_process(self): - """Returns a client of this server""" + def get_duration(self): + return exponential(self.actual_parameter) + self.min_duration + + def _make_process_arguments(self, duration=None, node=None, proc_id=None, client=None): if len(self.clients) == 0: raise Exception("Server %s has empty client list." % (self,)) - duration = exponential(self.actual_parameter) + self.min_duration - return random.choice(self.clients).process( - duration=float("%.2f" % (duration,)) + if duration is None: + duration = self.get_duration() + if client is None: + client = random.choice(self.clients) + if node is None: + node = random.choice(client.nodes) + if proc_id is None: + proc_id = "%s_%s" % (client.ap, client.get_id()) + return duration, node, proc_id, client + + def make_client_process(self, duration=None, node=None, proc_id=None, client=None): + """Returns a client of this server""" + (d, n, p, c) = self._make_process_arguments(duration, node, proc_id, client) + return c.process( + duration=float("%.2f" % (d,)), + node=n, + proc_id=p ) def run(self): @@ -239,26 +280,26 @@ class Server: cmd_2 = "./startup.sh %s" % (run_cmd,) logger.debug( 'Starting server %s on node %s with logfile %s.', - self.ap, node.name, logfile + self.id, node.name, logfile ) try: node.execute_command(cmd_1) self.pids[node] = (node.execute_command(cmd_2)) except ssh_support.SSHException: logger.warn('Could not start server %s on node %s.', - self.ap, node.name) + self.id, node.name) def stop(self): for node, pid in self.pids.items(): logger.debug( 'Killing server %s on node %s.', - self.ap, node.name + self.id, node.name ) try: node.execute_command("kill %s" % pid) except ssh_support.SSHException: logger.warn('Could not kill server %s on node %s.', - self.ap, node.name) + self.id, node.name) # Base class for ARCFIRE storyboards @@ -268,52 +309,70 @@ class Server: # @servers: App servers available in the network. # Type == Server or Type == List[Tuple[Server, Node]] # -class StoryBoard: +class StoryBoard(SBEntity): - DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) + SCRIPT_RESOLUTION = 0.1 + EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float) - def __init__(self, duration, experiment=None, servers=None): + def get_e_id(self): + return 'storyboard' + + def __init__(self, duration, experiment=None, servers=None, script=None): + self.id = 'storyboard' self.experiment = experiment self.duration = duration - self.servers = list() + self.cur_time = 0.0 + self.server_apps = {} + self.client_apps = {} if servers is None: servers = list() for s in servers: self._validate_and_add_server(s) self.client_nodes = set() self.server_nodes = set() - self.active_clients = [] + self.process_dict = {} + self.active_clients = self.process_dict.values() self.start_time = None self.commands_list = {} + self.script = script + self.node_map = {} + self._build_nodes_lists() def _build_nodes_lists(self): """Populates server_nodes and client_nodes lists""" - for server in self.servers: + for server in self.server_apps.values(): for node in server.nodes: self.server_nodes.add(node) for client in server.clients: for node in client.nodes: self.client_nodes.add(node) + if self.experiment is not None: + for node in self.experiment.nodes: + self.node_map[node.name] = node def _validate_and_add_server(self, s, n=None): if len(s.clients) == 0: logger.warning("'%s' server has no registered clients.", s.ap) + else: + for client in s.clients: + self.client_apps[client.id] = client if n is not None: s.add_node(n) - self.servers.append(s) else: if len(s.nodes) == 0: logger.warning("'%s' server has no registered nodes.", s.ap) - self.servers.append(s) for node in s.nodes: if node not in self.experiment.nodes: raise ValueError('Cannot run server on node %s, ' 'not in experiment.' % (node.name,)) + self.server_apps[s.id] = s + self._build_nodes_lists() def set_experiment(self, experiment): if not isinstance(experiment, model.Experiment): raise TypeError('Experiment instance required.') self.experiment = experiment + self._build_nodes_lists() def add_server(self, server): """Register a server node to the sb.""" @@ -335,74 +394,212 @@ class StoryBoard: if not isinstance(server, Server): raise TypeError('First argument must be of type Server') if not isinstance(node, model.Node): - raise TypeError('Second argument must be of type Server') + raise TypeError('Second argument must be of type Node') self._validate_and_add_server(server, node) def del_server(self, server): - self.servers.remove(server) + del self.server_apps[server.id] + self._build_nodes_lists() - def run_command(self, t, node, command): + def schedule_command(self, t, node, command): """ - Schedule the given command to be run at t seconds from the start. + Schedules the given command to be run at t seconds from the start. The commands are run in no particular order, so take care - :param t: (float) seconds to wait before running the command - :param node: (Node) the node on which the command should be run - :param command: (str or list[str]) the command(s) to be run, + @param t: (float) seconds to wait before running the command + @param node: (Node or str) the node on which the command should be run + @param command: (str or list[str]) the command(s) to be run, """ if self.experiment is None: - raise ValueError("Cannot add a server before " - "setting the experiment.") + raise ValueError("An experiment is needed to schedul commands.") + if self.script is None: + self.script = Script(self) + if isinstance(node, str): + node = self.node_map[node] if node not in self.experiment.nodes: raise ValueError('Cannot run command on node %s, ' 'not in experiment.' % (node.name,)) if isinstance(command, str): - self.commands_list.setdefault(t, []).append((node, command)) - else: # Hope it's an Iterable[str]. Otherwise, errors will happen. - for cmd in command: - self.commands_list.setdefault(t, []).append((node, cmd)) - - def periodic_check(self): - # Spawn new clients - for server in self.servers: - clients = server.get_new_clients(self.DEFAULT_INTERVAL) - for new_client in clients: - new_client.duration = min( - new_client.duration, - self.duration - (time.time() - self.start_time) - ) - # Make sure the duration of the client does not - # go beyond the storyboard lifetime - if new_client.duration < server.min_duration: - continue - # Do not start clients that would not run for - # at least the minimum duration - # (due to sb constraints) - new_client.run() - self.active_clients.append(new_client) - - # Kill expired clients - surviving = [] - - for x in self.active_clients: - if x.kill_check(): # - surviving.append(x) - self.active_clients = surviving - - # Do run_command instructions - unexpired_commands = {} - for t in self.commands_list: - if (time.time() - self.start_time) > t: - for node, command in self.commands_list[t]: - node.execute_command(command) - else: - unexpired_commands[t] = self.commands_list[t] - self.commands_list = unexpired_commands + command = [command] + action = functools.partial(self.run_command, node, command) + self.script.add_event(Event(action, ev_time=t)) + + def run_command(self, node, command): + """ + Runs a command (or several) on a given node, immediately. + + @param node: (Node or str) the node on which the command should be run + @param command: (str or list[str]) the command(s) to be run + """ + if self.experiment is None: + raise ValueError("Experiment needed to run commands.") + if isinstance(node, str): + node = self.node_map[node] + if node not in self.experiment.nodes: + raise ValueError('Cannot run command on node %s, ' + 'not in experiment.' % (node.name,)) + if isinstance(command, str): + command = [command] + node.execute_commands(command) + + def run_client_of(self, server, duration=None, node=None, proc_id=None): + """ + Runs a random client of the specified server + with the specified parameters. + + Except for the server, if a parameter is not specified, + it will be randomly generated according to the server + parameters (mean duration, client apps and their nodes) + + @param server: the server of which one client should be run + @param duration: the duration of the client process + @param node: (Node or str) the node on which the client should be run + @param proc_id: the entity ID to use for the process + """ + if isinstance(server, str): + server = self.server_apps[server] + if duration is None: + duration = server.get_duration() + client = random.choice(server.clients) + self.run_client(client, duration, node, proc_id) + + def run_client(self, client, duration, node=None, proc_id=None): + """ + Runs the specified client app with the specified parameters. + + If the node parameter is not given, it will be chosen at random + among the client default nodes. + + @param client: the client which should be run + @param duration: the duration of the client process + @param node: (Node or str) the node on which the client should be run + @param proc_id: the entity ID to use for the process + """ + if isinstance(client, str): + client = self.client_apps[client] + if node is None: + if len(client.nodes) == 0: + raise ValueError('No nodes registered for client %s', + client.id) + node = random.choice(client.nodes) + elif isinstance(node, str): + node = self.node_map[node] + process = client.process(duration, node, proc_id) + self.process_dict[process.id] = process + process.run() + action = functools.partial(self.kill_process, process.id) + self.script.add_event(Event(action, ev_time=(self.cur_time + duration))) + + def start_client_of(self, server, duration=None, node=None, proc_id=None): + """ + Starts a random client of the specified server + with the specified parameters. + + Except for the server and the duration, if a parameter + is not specified, it will be randomly generated according + to the server parameters (client apps and their nodes). + + If the client app must be shutdown manually or the duration + parameter is None, the client process will _not_ be stopped + automatically, and will continue running unless otherwise + killed. + + @param server: the server of which one client should be run + @param duration: the duration of the client process + @param node: the node on which the client should be run + @param proc_id: the entity ID to use for the process + """ + if isinstance(server, str): + server = self.server_apps[server] + client = random.choice(server.clients) + self.start_client(client, duration, node, proc_id) + + def start_client(self, client, duration=None, node=None, proc_id=None): + """ + Starts the specified client app with the specified parameters. + + If the node parameter is not given, it will be chosen at random + among the client default nodes. + + If the app must be shutdown manually or the duration + parameter is None, it will _not_ be stopped + automatically, and will continue running unless otherwise + killed. + + @param client: the client which should be run + @param duration: the duration of the client process + @param node: the node on which the client should be run + @param proc_id: the entity ID to use for the process + """ + if isinstance(client, str): + client = self.client_apps[client] + if node is None: + node = random.choice(client.nodes) + process = client.process(duration, node, proc_id) + self.process_dict[process.id] = process + process.run() + + def kill_process(self, proc_id): + process = self.process_dict.get(proc_id, None) + if process is None: + raise ValueError('No process named %s' % (proc_id,)) + process.stop() + del self.process_dict[proc_id] + + def periodic_check(self, t): + self.script.check_for_ready_ev(t) + self.script.run_ready() + + def parse_script(self, buffer): + self.script = Script(self) + self.script.parse(buffer) + + def parse_script_file(self, filename): + self.script = Script(self) + self.script.parse_file(filename) + + def generate_script(self): + if self.experiment is None: + raise ValueError('Cannot generate script without an experiment') + self.script = Script(self) + t = self.SCRIPT_RESOLUTION + marker = 5 + last_marker = 0 + while t < self.duration: + if int(t) >= (last_marker+1)*marker: + last_marker += 1 + logger.debug('Passed the %s seconds mark', last_marker*marker) + for server in self.server_apps.values(): + c_l = server.get_new_clients(self.SCRIPT_RESOLUTION) + for d, n, p, c in c_l: + if d > self.duration - t: # would outlast the experiment + continue + start = self._make_process_events( + t, + d, + n, + p, + c + ) + self.script.add_event(start) + t += self.SCRIPT_RESOLUTION + + def _make_process_events(self, t, d, n, p, c): + start_action = functools.partial( + self.run_client, + c, + d, + n, + p + ) + start_event = Event(start_action, ev_time=t) + return start_event def start(self): if self.experiment is None: - raise ValueError("Cannot add a server before " - "setting the experiment.") + raise ValueError("Cannot run sb with no experiment.") + if self.script is None: + self.generate_script() logger.info('Starting storyboard execution') self._build_nodes_lists() logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes]) @@ -420,18 +617,24 @@ class StoryBoard: "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) try: - for server in self.servers: + for server in self.server_apps.values(): server.run() - while time.time() - self.start_time < self.duration: - self.periodic_check() - time.sleep(self.DEFAULT_INTERVAL) - self.periodic_check() # Do things that were scheduled - # in the last "INTERVAL" seconds - # of the StoryBoard + res = self.SCRIPT_RESOLUTION # for brevity + while self.cur_time < self.duration: + self.periodic_check(self.cur_time) + next_breakpoint = math.ceil(self.cur_time / res) * res + delta = next_breakpoint - self.cur_time + if delta > 0: # just in case + time.sleep(delta) + self.cur_time = float(time.time() - self.start_time) + self.periodic_check(self.cur_time) + # Do things that were scheduled + # in the last seconds + # of the StoryBoard finally: # Kill everything. No more mercy. for client in self.active_clients: client.stop() - for server in self.servers: + for server in self.server_apps.values(): server.stop() def fetch_logs(self, local_dir=None): @@ -452,3 +655,363 @@ class StoryBoard: logs_list = [x for x in logs_list.split('\n') if x != ''] logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, local_dir) + + +class Event(object): + + cur_id = -1 + + @classmethod + def generate_id(cls): + cls.cur_id += 1 + return "__event_%s" % (cls.cur_id,) + + def __init__(self, action, ev_id=None, ev_time=None, trigger=None): + """ + @param ev_id: (str) id of the event + @param action: (any SBEntity method) action to undertake + when event is activated + @param ev_time: (float) seconds interval to wait before running the event + @param trigger: (Event) Event which must complete before this runs + """ + if ev_time is None and trigger is None: + raise ValueError('No condition specified for event %s.' % + ev_id) + self.id = ev_id if ev_id is not None else self.generate_id() + self.action = action + self.time = ev_time + self._trigger = trigger + self.trigger = trigger.id if trigger is not None else None + self.exception = None + self.done = False + self._repr = "%(prefix)s &%(label)s | %(entity)s %(method)s" % { + 'prefix': self._prefix_repr(), + 'label': self.id, + 'entity': '$' + self.action.func.__self__.get_e_id(), + 'method': self._action_repr() + } + + def _action_repr(self): + if isinstance(self.action, functools.partial): + name = self.action.func.__name__ + args = ' '.join(self._action_arg_repr(x) for x in self.action.args) + else: + name = self.action.__name__ + args = '' + return ' '.join((name, args)) + + @staticmethod + def _action_arg_repr(arg): + if hasattr(arg, 'get_e_id'): + return '$' + arg.get_e_id() + elif isinstance(arg, str): + return "'" + arg + "'" + elif isinstance(arg, float): + return "%.2f" % (arg,) + else: + return str(arg) # Assuming int + + def _prefix_repr(self): + conditions = [] + if self.time is not None: + conditions.append("%.2f" % (self.time,)) + if self.trigger is not None: + conditions.append(self.trigger) + return ", ".join(conditions) + + @property + def failed(self): + return self.exception is not None + + def pre_exec(self): # hook to be overridden + pass + + def post_exec(self): # hook to be overridden + pass + + def _start(self): + self.pre_exec() + if self.done: + raise ValueError('Event %s has already ran' % self.id) + + def run(self): + """ + Run this event's action + """ + self._start() + try: + self.action() + except Exception as e: + self.exception = e + self._done() + + def check(self, cur_time): + """ + Check if this event can be run. + @param cur_time: (float) current time + @return: True if the preconditions are satisfied, False otherwise. + """ + return \ + (self.time is None or cur_time > self.time) \ + and (self._trigger is None or self._trigger.done) + + def _done(self): + self.done = True + if self.exception is not None: + logger.warning('Event %s failed. %s: %s.', + self.id, + type(self.exception).__name__, + str(self.exception)) + self.post_exec() + + def __str__(self): + return self.id + + def __repr__(self): + return self._repr + + +class Script(object): + + def __init__(self, storyboard): + # Brute force 'dump all in memory' approach to avoid + # iterating through the events a lot of times + # at runtime + self.events_by_id = {} + self._waiting_events = {} + self._events_ready = [] + + self._nodes = {} + self._servers = {} + self._clients = {} + self._testbed = None + self._experiment = None + self._storyboard = None + self._entities = {} + self._parse_entities(storyboard) + + def _parse_entities(self, storyboard): + self._storyboard = storyboard + self._experiment = self._storyboard.experiment + self._nodes = self._storyboard.node_map + self._testbed = self._experiment.testbed + self._servers = self._storyboard.server_apps + self._clients = self._storyboard.client_apps + self._processes = {} + self._entities = { + 'sb': self._storyboard, + 'storyboard': self._storyboard, + 'testbed': self._testbed, + 'experiment': self._experiment, + 'Node': self._nodes, + 'Server': self._servers, + 'Client': self._clients, + 'ClientProcess': self._processes + } + + def add_process(self, process): + self._processes[process.id] = process + + def add_event(self, event): + """ + Add an event to this script + @param event: (Event or str) the event to add + """ + if isinstance(event, str): + self._parse_line(event) + return + ev_id = event.id + self.events_by_id[ev_id] = event + self._waiting_events[ev_id] = event + + def del_event(self, event): + """ + Remove an event from this script + @param event: (Event or str) the event (or id thereof) to remove + """ + if isinstance(event, Event): + event = event.id + del self.events_by_id[event] + del self._events_ready[event] + + def check_for_ready_ev(self, cur_time): + """ + Check which events are ready to run, and mark them + @param (float) current time + """ + new_wait = {} + for i, e in self._waiting_events.items(): + if e.check(cur_time): + self._events_ready.append(e) + else: + new_wait[i] = e + self._waiting_events = new_wait + + def run_ready(self): + """ + Run events marked as ready by a previous inspection + + (see @check_for_ready_ev) + """ + while len(self._events_ready): + event = self._events_ready.pop() + logger.debug("Running event %s.", event.id) + event.run() + + def _nested_iterator(self, d): + for t in ((x, self.events_by_id[x]) + for e_lst in d.values() for x in e_lst): + yield t + + def _resolve_entity(self, e_id): + e_key = e_id.split('.') + result = self._entities + while len(e_key) != 0: + key = e_key.pop(0) + result = result.get(key) + if result is None: + raise ValueError('Invalid entity key %s at %s' + % (e_id, key)) + return result + + def _parse_action(self, entity, action_l): + method_n = action_l[0] + method = getattr(entity, method_n, None) + if method is None: + raise ValueError('No method called "%s" for entity %s.' + % (method_n, entity.get_e_id())) + args_l = action_l[1:] + args = self._parse_action_arguments(args_l) + return functools.partial(method, *args) + # TODO maybe some argument checking? + + def _parse_action_arguments(self, args_l): + args = [] + quotes_part = None + while len(args_l) > 0: + arg_s = args_l.pop(0) + if quotes_part is not None: + if arg_s.endswith("'"): + arg_s = arg_s[:-1] # Strip final ' + quotes_part += ' ' + arg_s + args.append(quotes_part) + quotes_part = None + elif arg_s.startswith("'"): + arg_s = arg_s[1:] # Strip initial ' + if arg_s.endswith("'"): + arg_s = arg_s[:-1] # Strip final ' + args.append(arg_s) + else: + quotes_part = arg_s + else: + if arg_s.startswith('$'): + args.append(self._resolve_entity(arg_s[1:])) + else: + try: + args.append(float(arg_s)) + except ValueError: + raise ValueError('Syntax error: %s is not a float. ' + 'If it is supposed to be a string, ' + 'enclose it in single quotes.' + % (arg_s,)) + return args + + def _parse_prefix(self, prefix): + prefix = prefix.strip().split('&') + if len(prefix) > 2: + raise ValueError('Syntax error: multiple "&" in prefix') + conditions = prefix[0].strip().split(',') + if len(prefix) == 2: + label = prefix[1].strip() + else: + label = None + if len(conditions) == 0: + raise ValueError("Syntax error: expected at least one condition") + elif len(conditions) > 2: + raise ValueError("Syntax error: expected at most two condition") + t, trigger = self._parse_conditions(*[x.strip() for x in conditions]) + return label, t, trigger + + def _parse_suffix(self, suffix): + action_l = suffix.strip().split(' ') + if action_l[0].startswith('$'): + entity_id = action_l[0][1:] + action_l = action_l[1:] + if len(action_l) == 0: + raise ValueError('Syntax error: missing action.') + else: + entity_id = 'sb' + entity = self._resolve_entity(entity_id) + action = self._parse_action(entity, action_l) + return action + + def _parse_line(self, line): + parts = line.split('|') + if len(parts) != 2: + raise ValueError("Syntax error: expected exactly one '|'") + label, t, trigger = self._parse_prefix(parts[0]) + action = self._parse_suffix(parts[1]) + event = Event(action, ev_id=label, ev_time=t, trigger=trigger) + self.add_event(event) + + def parse(self, str_iterable): + for index, line in enumerate(str_iterable): + if line.startswith('#'): + continue + if line.strip('\n').strip(' ') == '': + continue + try: + self._parse_line(line) + except ValueError as e: + raise ValueError(str(e) + ' -> @ line %s' % (index,)) + + def parse_file(self, filename): + with open(filename, 'r') as f: + self.parse(f) + + def parse_string(self, s): + buffer = StringIO(s) + self.parse(buffer) + + def write(self, buffer): + ev_list = list(self.events_by_id.values()) + ev_list.sort(key=lambda x: x.time if x.time is not None else float('+inf')) + for event in ev_list: + buffer.write(repr(event) + '\n') + + def write_to_file(self, filename): + with open(filename, 'w') as f: + self.write(f) + + def write_string(self): + s = StringIO() + self.write(s) + return s + + def _parse_conditions(self, *conditions): + """ + Parses condition strings and returns the conditions + @param conditions: list of strings + @return: (Tuple[float, event]) -> (time, trigger) + """ + t, trigger = None, None + for cond in conditions: + if t is None: + try: + temp = float(cond) + if temp > 24 * 3600: # seconds in a day + raise ValueError + t = temp + continue + except ValueError: + pass + if trigger is None: + try: + trigger = self.events_by_id[cond] + continue + except KeyError: + pass + raise ValueError('Syntax error: cannot parse condition {}. ' + 'Either the condition is malformed, or there are ' + 'multiple triggers of the same type.') + return t, trigger diff --git a/rumba/utils.py b/rumba/utils.py index 33ba71c..a522e94 100644 --- a/rumba/utils.py +++ b/rumba/utils.py @@ -176,7 +176,7 @@ class ExperimentManager(object): if do_swap_out: self.experiment.swap_out() if exc_val is not None: - logger.error('Something went wrong during swap out. ' + logger.error('Something went wrong. ' 'Got %s: %s', type(exc_val).__name__, str(exc_val)) logger.debug('Exception details:', exc_info=exc_val) diff --git a/tools/scriptgenerator.py b/tools/scriptgenerator.py new file mode 100644 index 0000000..059163e --- /dev/null +++ b/tools/scriptgenerator.py @@ -0,0 +1,88 @@ +import argparse +import importlib.machinery as mach + +from rumba.storyboard import * +import rumba.log as log +from rumba.utils import ExperimentManager, PAUSE_SWAPOUT + +client1 = Client( + "rinaperf", + options="-t perf -s 1000 -c 0", + c_id='rinaperf_^C' # To differentiate +) + + +client2 = Client( + "rinaperf", + options="-t perf -s 1000 -D <duration>", + shutdown="", + c_id='rinaperf_-D' # To differentiate +) + + +server = Server( + "rinaperf", + options="-l", + arrival_rate=0.5, + mean_duration=5, + clients=[client1, client2] +) + + +def main(duration, exp, run=False, script='generated_script.txt'): + story = StoryBoard(duration) + story.set_experiment(exp) + story.add_server_on_node(server, exp.nodes[0]) + client1.add_node(exp.nodes[1]) + if len(exp.nodes) == 2: + third_n = exp.nodes[1] + else: + third_n = exp.nodes[2] + client2.add_node(third_n) + story.generate_script() + + with open(script, 'w') as f: + f.write('################################################\n') + f.write('# SCRIPT GENERATED WITH RUMBA SCRIPT GENERATOR #\n') + f.write('################################################\n') + story.script.write(f) + + if run: + with ExperimentManager(exp, swap_out_strategy=PAUSE_SWAPOUT): + exp.swap_in() + exp.bootstrap_prototype() + story.start() + + +if __name__ == '__main__': + + description = "Storyboard generator for Rumba scripts" + epilog = "2018 Marco Capitani <m.capitani@nextworks.it>" + + parser = argparse.ArgumentParser(description=description, + epilog=epilog) + parser.add_argument('exp', metavar='EXPERIMENT', type=str, + help='The experiment file.') + parser.add_argument('-O', '--object', metavar='OBJECT', type=str, + default='exp', + help='The Python variable name of ' + 'the experiment object defined in the file') + parser.add_argument('d', metavar='DURATION', type=float, + help='The required duration of the storyboard') + parser.add_argument('-R', '--run', action='store_true', + help='Run the storyboard after script generation.') + parser.add_argument('-S', '--script', type=str, + default='generated_script.txt', + help='Name of the output script.') + + args = parser.parse_args() + + try: + exp_module = mach.SourceFileLoader('exp', args.exp).load_module() + experiment = getattr(exp_module, args.object) + main(args.d, experiment, args.run, args.script) + except AttributeError: + logger.error('Module %s has no attribute %s', args.exp, args.object) + except Exception as e: + logger.error('Could not load file %s. %s.', args.exp, str(e)) + log.flush_log() |