diff options
-rw-r--r-- | examples/example-script.rsb | 4 | ||||
-rwxr-xr-x | examples/script-example.py | 17 | ||||
-rw-r--r-- | rumba/model.py | 12 | ||||
-rw-r--r-- | rumba/ssh_support.py | 8 | ||||
-rw-r--r-- | rumba/storyboard.py | 325 | ||||
-rw-r--r-- | rumba/testbeds/qemu.py | 2 | ||||
-rw-r--r-- | tools/scriptgenerator.py | 2 |
7 files changed, 272 insertions, 98 deletions
diff --git a/examples/example-script.rsb b/examples/example-script.rsb index 8b5c714..f00ff42 100644 --- a/examples/example-script.rsb +++ b/examples/example-script.rsb @@ -33,7 +33,9 @@ echo2, 18 &ev4| $sb run_client_of $Server.server_b 1.2 &ev5| run_client_of $Server.server_c # Events need _not_ be in temporal order # if no object ($ handle) is provided, the storyboard -# is assumed as the object/ +# is assumed as the object + +14 | $Node.node_a set_link_state $ShimEthDIF.e1 'up' diff --git a/examples/script-example.py b/examples/script-example.py index 316a1d1..6cfaf42 100755 --- a/examples/script-example.py +++ b/examples/script-example.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # An example script leveraging the storyboard scripting functionality +from functools import partial from rumba.storyboard import * from rumba.model import * @@ -83,6 +84,11 @@ server_c = Server( s_id='server_c' ) +typewriter = Client( + "touch test_file", + shutdown="" +) + if __name__ == '__main__': story = StoryBoard(30) @@ -93,7 +99,16 @@ if __name__ == '__main__': client_a.add_node(node_b) client_b.add_node(node_b) client_c.add_node(node_b) - story.parse_script_file('example-script.rsb') + script_file = os.path.join( + os.path.dirname(__file__), + 'example-script.rsb' + ) + story.parse_script_file(script_file) + cb = partial(story.run_command, node_a, "ls -l test_file") + # This will return a file, because it will run after + # Triggering_event + action = partial(story.run_client, typewriter, 0.2, node=node_a, callback=cb) + story.add_event(Event(action, ev_time=12, ev_id='Triggering_event')) log.flush_log() with ExperimentManager(exp): exp.swap_in() diff --git a/rumba/model.py b/rumba/model.py index 20f8215..d46cff9 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -145,6 +145,10 @@ class ShimUDPDIF(DIF): # @link_speed [int] Speed of the Ethernet network, in Mbps # class ShimEthDIF(DIF): + + def get_e_id(self): + return "ShimEthDIF." + self.name + def __init__(self, name, members=None, link_speed=0): DIF.__init__(self, name, members) self.link_speed = int(link_speed) @@ -239,6 +243,7 @@ class Node(object): self.ssh_config = SSHConfig(name) self.ipcps = [] self.policies = dict() + self.has_tcpdump = False if policies is None: policies = dict() for dif in self.difs: @@ -358,9 +363,10 @@ class Node(object): self.executor.fetch_files(self, paths, destination, sudo) def fetch_file(self, path, destination, sudo=False): - self.executor.fetch_files(self, path, destination, sudo) + self.executor.fetch_file(self, path, destination, sudo) - def set_link_state(self, ipcp, state): + def set_link_state(self, dif, state): + ipcp = self.get_ipcp_by_dif(dif) self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state, as_root=True) @@ -893,4 +899,4 @@ class Executor: def fetch_files(self, node, paths, destination, sudo=False): for path in paths: - self.fetch_file(node, path, destination, sudo)
\ No newline at end of file + self.fetch_file(node, path, destination, sudo) diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py index b1492e7..a9dff28 100644 --- a/rumba/ssh_support.py +++ b/rumba/ssh_support.py @@ -336,8 +336,10 @@ def copy_files_from_testbed(testbed, ssh_config, paths, destination = destination + '/' if sudo: - execute_command(testbed, ssh_config, - 'sudo chmod a+rw %s' % (" ".join(paths))) + cmd = 'chmod a+rw %s' % (" ".join(paths)) + if ssh_config.username != 'root': + cmd = "sudo %s" % command + execute_command(testbed, ssh_config, cmd) if ssh_config.client is None: client, proxy_client = ssh_connect(ssh_config.hostname, ssh_config.port, @@ -436,4 +438,4 @@ def aptitude_install(testbed, node, packages): "while ! " + sudo("apt-get update") + "; do sleep 1; done", "while ! " + sudo(package_install) + "; do sleep 1; done"] - execute_proxy_commands(testbed, node.ssh_config, cmds, time_out=None)
\ No newline at end of file + execute_proxy_commands(testbed, node.ssh_config, cmds, time_out=None) diff --git a/rumba/storyboard.py b/rumba/storyboard.py index edf4f0c..0dc4fba 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -34,6 +34,7 @@ import math import os import random import time +import uuid import rumba.model as model import rumba.ssh_support as ssh_support @@ -82,7 +83,7 @@ class Client(SBEntity): def __init__(self, ap, nodes=None, options=None, shutdown="kill <pid>", c_id=None): self.ap = ap - e_id = c_id if c_id is not None else self.ap + e_id = c_id if c_id is not None else self.ap.replace(' ', '_') super(Client, self).__init__(e_id) self.startup = (ap + ((" " + options) if options is not None else "")) if isinstance(nodes, model.Node): @@ -99,7 +100,7 @@ class Client(SBEntity): def process(self, duration, node=None, proc_id=None): if proc_id is None: - proc_id = "%s_%s" % (self.ap, self.get_id()) + proc_id = "%s_%s" % (self.id, self.get_id()) if node is None: if len(self.nodes) == 0: raise ValueError('No nodes for client %s' @@ -200,11 +201,19 @@ class ClientProcess(SBEntity): # @nodes: Specific nodes to start this server on # class Server(SBEntity): + + current_id = -1 + + @classmethod + def get_id(cls): + cls.current_id += 1 + return cls.current_id + def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), clients=None, nodes=None, min_duration=2, s_id=None): self.ap = ap - e_id = s_id if s_id is not None else self.ap + e_id = s_id if s_id is not None else self.ap.replace(' ', '_') super(Server, self).__init__(e_id) self.options = options if options is not None else "" self.max_clients = max_clients @@ -274,21 +283,16 @@ class Server(SBEntity): def run(self): for node in self.nodes: - logfile = "/tmp/%s_server.log" % self.ap - script = r'nohup "$@" > %s 2>&1 & echo "$!"' % (logfile,) run_cmd = self.ap + ( (" " + self.options) if self.options is not None else "" ) - cmd_1 = "echo '%s' > startup.sh && chmod a+x startup.sh" \ - % (script,) - cmd_2 = "./startup.sh %s" % (run_cmd,) + cmd_2 = "./startup.sh %s %s" % ('server_' + self.id, run_cmd) logger.debug( - 'Starting server %s on node %s with logfile %s.', - self.id, node.name, logfile + 'Starting server %s on node %s.', + self.id, node.name ) try: - node.execute_command(cmd_1) - self.pids[node] = (node.execute_command(cmd_2)) + self.pids[node] = node.execute_command(cmd_2) except ssh_support.SSHException: logger.warn('Could not start server %s on node %s.', self.id, node.name) @@ -338,9 +342,12 @@ class StoryBoard(SBEntity): self.active_clients = self.process_dict.values() self.start_time = None self.commands_list = {} - self.script = script self.node_map = {} + self.shims = {} self._build_nodes_lists() + # The following must be last, because it needs the info from + # _build_nodes_list + self._script = script if script is not None else _Script(self) def _build_nodes_lists(self): """Populates server_nodes and client_nodes lists""" @@ -353,6 +360,9 @@ class StoryBoard(SBEntity): if self.experiment is not None: for node in self.experiment.nodes: self.node_map[node.name] = node + for dif in self.experiment.dif_ordering: + if isinstance(dif, model.ShimEthDIF): + self.shims[dif.name] = dif def _validate_and_add_server(self, s, n=None): if len(s.clients) == 0: @@ -373,6 +383,11 @@ class StoryBoard(SBEntity): self._build_nodes_lists() def set_experiment(self, experiment): + """ + Set the storyboard's underlying experiment instance + + @param experiment: the experiment instance + """ if not isinstance(experiment, model.Experiment): raise TypeError('Experiment instance required.') self.experiment = experiment @@ -391,6 +406,9 @@ class StoryBoard(SBEntity): """ Utility method to simultaneously add a server to a sb and a node to the server. + + @param server: the server to be added to the storyboard + @param node: the node upon which the server should run """ if self.experiment is None: raise ValueError("Cannot add a server before " @@ -416,8 +434,8 @@ class StoryBoard(SBEntity): """ if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") - if self.script is None: - self.script = Script(self) + if self._script is None: + self._script = _Script(self) if isinstance(node, str): node = self.node_map[node] if node not in self.experiment.nodes: @@ -426,7 +444,7 @@ class StoryBoard(SBEntity): if isinstance(command, str): command = [command] action = functools.partial(self.run_command, node, command) - self.script.add_event(Event(action, ev_time=t)) + self._script.add_event(Event(action, ev_time=t)) def run_command(self, node, command): """ @@ -446,7 +464,31 @@ class StoryBoard(SBEntity): command = [command] node.execute_commands(command) - def run_client_of(self, server, duration=None, node=None, proc_id=None): + def add_event(self, event): + """ + Add an event to this script, provided either as an Event + instance or as a string as read from a .rsb script. + + :param event: the event to add + :type event: (Event or str) + """ + self._script.add_event(event) + + def del_event(self, event): + """ + Remove an event from this storyboard + + :param event: the event (or id thereof) to remove + :type event: (Event or str) + """ + self._script.del_event(event) + + def run_client_of(self, + server, + duration=None, + node=None, + proc_id=None, + callback=None): """ Runs a random client of the specified server with the specified parameters. @@ -459,15 +501,22 @@ class StoryBoard(SBEntity): @param duration: the duration of the client process @param node: (Node or str) the node on which the client should be run @param proc_id: the entity ID to use for the process + @param callback: callable or list thereof to be run + after client termination """ if isinstance(server, str): server = self.server_apps[server] if duration is None: duration = server.get_duration() client = random.choice(server.clients) - self.run_client(client, duration, node, proc_id) - - def run_client(self, client, duration, node=None, proc_id=None): + self.run_client(client, duration, node, proc_id, callback) + + def run_client(self, + client, + duration, + node=None, + proc_id=None, + callback=None): """ Runs the specified client app with the specified parameters. @@ -478,6 +527,8 @@ class StoryBoard(SBEntity): @param duration: the duration of the client process @param node: (Node or str) the node on which the client should be run @param proc_id: the entity ID to use for the process + @param callback: callable or list thereof to be run + after client termination """ if isinstance(client, str): client = self.client_apps[client] @@ -488,11 +539,19 @@ class StoryBoard(SBEntity): node = random.choice(client.nodes) elif isinstance(node, str): node = self.node_map[node] + if callback is None: + callback = [] + elif not hasattr(callback, '__len__'): + callback = [callback] process = client.process(duration, node, proc_id) self.process_dict[process.id] = process process.run() action = functools.partial(self.kill_process, process.id) - self.script.add_event(Event(action, ev_time=(self.cur_time + duration))) + term_ev = Event(action, ev_time=(self.cur_time + duration)) + self.add_event(term_ev) + for cb in callback: + cb_event = Event(action=cb, trigger=term_ev) + self.add_event(cb_event) def start_client_of(self, server, duration=None, node=None, proc_id=None): """ @@ -503,10 +562,10 @@ class StoryBoard(SBEntity): is not specified, it will be randomly generated according to the server parameters (client apps and their nodes). - If the client app must be shutdown manually or the duration - parameter is None, the client process will _not_ be stopped - automatically, and will continue running unless otherwise - killed. + Note that this method, as opposed to + :meth:`rumba.storyboard.run_client_of`, will not generate an event + to stop the client after the duration is expired. In most cases, + :meth:`rumba.storyboard.run_client_of` is the way to go. @param server: the server of which one client should be run @param duration: the duration of the client process @@ -525,10 +584,10 @@ class StoryBoard(SBEntity): If the node parameter is not given, it will be chosen at random among the client default nodes. - If the app must be shutdown manually or the duration - parameter is None, it will _not_ be stopped - automatically, and will continue running unless otherwise - killed. + Note that this method, as opposed to + :meth:`rumba.storyboard.run_client`, will not generate an event + to stop the client after the duration is expired. In most cases, + :meth:`rumba.storyboard.run_client` is the way to go. @param client: the client which should be run @param duration: the duration of the client process @@ -551,21 +610,21 @@ class StoryBoard(SBEntity): del self.process_dict[proc_id] def periodic_check(self, t): - self.script.check_for_ready_ev(t) - self.script.run_ready() - - def parse_script(self, buffer): - self.script = Script(self) - self.script.parse(buffer) + self._script.check_for_ready_ev(t) + self._script.run_ready() - def parse_script_file(self, filename): - self.script = Script(self) - self.script.parse_file(filename) + def generate_script(self, clean=True): + """ + Randomly generate a script for this experiment based on the + parameters provided to the instances of servers, nodes and clients. - def generate_script(self): + @param clean: if True, discard the current script before + generating a new one. + """ if self.experiment is None: raise ValueError('Cannot generate script without an experiment') - self.script = Script(self) + if clean: + self._script = _Script(self) t = self.SCRIPT_RESOLUTION marker = 5 last_marker = 0 @@ -585,7 +644,7 @@ class StoryBoard(SBEntity): p, c ) - self.script.add_event(start) + self._script.add_event(start) t += self.SCRIPT_RESOLUTION def _make_process_events(self, t, d, n, p, c): @@ -602,14 +661,12 @@ class StoryBoard(SBEntity): def start(self): if self.experiment is None: raise ValueError("Cannot run sb with no experiment.") - if self.script is None: + if self._script is None: self.generate_script() logger.info('Starting storyboard execution') self._build_nodes_lists() logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes]) - logger.debug('Client nodes are: %s.', - [x.name for x in self.client_nodes]) logger.debug('Command list is: %s.', {x: [(y.name, z) for y, z in t] for (x, t) @@ -617,8 +674,8 @@ class StoryBoard(SBEntity): self.start_time = time.time() script = r'logname="$1"; shift; nohup "${@}" ' \ r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"' - logger.debug("Writing utility startup script on client nodes.") - for node in self.client_nodes: + logger.debug("Writing utility startup script on nodes.") + for node in self.node_map.values(): node.execute_command( "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) @@ -674,12 +731,12 @@ class StoryBoard(SBEntity): if not isinstance(dif, model.ShimEthDIF): raise Exception("Not a Shim Ethernet DIF.") - if self.script is None: - self.script = Script(self) + if self._script is None: + self._script = _Script(self) - for ipcp in dif.ipcps: - action = functools.partial(ipcp.node.set_link_state, ipcp, state) - self.script.add_event(Event(action, ev_time=t)) + for node in dif.members: + action = functools.partial(node.set_link_state, dif, state) + self._script.add_event(Event(action, ev_time=t)) def set_link_up(self, t, dif): self.set_link_state(t, dif, 'up') @@ -691,14 +748,14 @@ class StoryBoard(SBEntity): if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") - if self.script is None: - self.script = Script(self) + if self._script is None: + self._script = _Script(self) - for ipcp in node.ipcps: - if not isinstance(ipcp, model.ShimEthIPCP): + for dif in node.difs: + if not isinstance(dif, model.ShimEthDIF): continue - action = functools.partial(ipcp.node.set_link_state, ipcp, state) - self.script.add_event(Event(action, ev_time=t)) + action = functools.partial(node.set_link_state, dif, state) + self._script.add_event(Event(action, ev_time=t)) def set_node_up(self, t, node): self.set_node_state(t, node, 'up') @@ -706,6 +763,105 @@ class StoryBoard(SBEntity): def set_node_down(self, t, node): self.set_node_state(t, node, 'down') + def write_script(self, buffer): + """ + Writes the script on a string buffer, at the current position + + @param buffer: a string buffer. + """ + self._script.write(buffer) + + def write_script_to_file(self, filename, clean=True): + """ + Writes the script to a file, overwriting content. + + @param filename: the name of the destination file + @param clean: if True, current file contents will be overwritten. + """ + mode = 'w' + if not clean: + mode += '+' + with open(filename, mode) as f: + self.write_script(f) + + def write_script_string(self): + """ + Writes the script into a string and returns it. + + @return: the script as a string. + """ + s = StringIO() + self.write_script(s) + return s + + def parse_script(self, buffer, clean=True): + """ + Reads a script from a buffer, at the current position. + + @param buffer: the buffer to read from. + @param clean: if True, discard the current script before reading. + """ + if clean: + self._script = _Script(self) + self._script.parse(buffer) + + def parse_script_file(self, filename, clean=True): + """ + Reads a script from a file. + + @param filename: the file to read from. + @param clean: if True, discard the current script before reading. + """ + if clean: + self._script = _Script(self) + with open(filename, 'r') as f: + self.parse_script(f, clean) + + def parse_script_string(self, string, clean=True): + """ + Reads a script from a string. + + @param string: the string to read from. + @param clean: if True, discard the current script before reading. + """ + if clean: + self._script = _Script(self) + buffer = StringIO(string) + self.parse_script(buffer, clean) + + def capture_traffic(self, start, end, node, dif): + """ + Captures the traffic of an interface on a certain node. + + :param start: The time to start capturing. + :param end: The time to stop capturing. + :param node: The node to capture on. + :param dif: The Shim Ethernet DIF of the node, Rumba + automatically resolves the correct interface. + """ + for ipcp in dif.ipcps: + if ipcp.node is not node: + continue + # In case tcpdump is not present, this assumes a testbed + # with Ubuntu/Debian just like the rest of installation + if not node.has_tcpdump: + ssh_support.aptitude_install(self.experiment.testbed, + node, ["tcpdump"]) + node.has_tcpdump = True + + # Create random string + pcap_file = node.name + '_' + dif.name + '_' + \ + str(uuid.uuid4())[0:4] + ".pcap" + + tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" \ + % (ipcp.ifname, pcap_file)) + duration = end - start + cb = functools.partial(node.fetch_file, pcap_file, + self.experiment.log_dir, sudo=True) + action = functools.partial(self.run_client, tcpd_client, + duration=duration, node=node, + callback = cb) + self._script.add_event(Event(action, ev_time=start)) class Event(object): @@ -724,10 +880,10 @@ class Event(object): @param ev_time: (float) seconds to wait before running the event @param trigger: (Event) Event which must complete before this runs """ + self.id = ev_id if ev_id is not None else self.generate_id() if ev_time is None and trigger is None: raise ValueError('No condition specified for event %s.' % - ev_id) - self.id = ev_id if ev_id is not None else self.generate_id() + self.id) self.action = action self.time = ev_time self._trigger = trigger @@ -821,12 +977,11 @@ class Event(object): return self._repr -class Script(object): +class _Script(object): def __init__(self, storyboard): - # Brute force 'dump all in memory' approach to avoid - # iterating through the events a lot of times - # at runtime + if storyboard is None: + raise ValueError("storyboard must not be None") self.events_by_id = {} self._waiting_events = {} self._events_ready = [] @@ -834,20 +989,28 @@ class Script(object): self._nodes = {} self._servers = {} self._clients = {} - self._testbed = None - self._experiment = None - self._storyboard = None + self._storyboard = storyboard self._entities = {} - self._parse_entities(storyboard) + self._parse_entities() - def _parse_entities(self, storyboard): - self._storyboard = storyboard - self._experiment = self._storyboard.experiment + @property + def _experiment(self): + return self._storyboard.experiment + + @property + def _testbed(self): + exp = self._experiment + if exp is None: + return None + else: + return exp.testbed + + def _parse_entities(self): self._nodes = self._storyboard.node_map - self._testbed = self._experiment.testbed self._servers = self._storyboard.server_apps self._clients = self._storyboard.client_apps self._processes = {} + self._shims = self._storyboard.shims self._entities = { 'sb': self._storyboard, 'storyboard': self._storyboard, @@ -856,7 +1019,8 @@ class Script(object): 'Node': self._nodes, 'Server': self._servers, 'Client': self._clients, - 'ClientProcess': self._processes + 'ClientProcess': self._processes, + 'ShimEthDIF': self._shims } def add_process(self, process): @@ -1015,29 +1179,12 @@ class Script(object): except ValueError as e: raise ValueError(str(e) + ' -> @ line %s' % (index,)) - def parse_file(self, filename): - with open(filename, 'r') as f: - self.parse(f) - - def parse_string(self, s): - buffer = StringIO(s) - self.parse(buffer) - def write(self, buffer): ev_list = list(self.events_by_id.values()) ev_list.sort(key=lambda x: x.time if x.time is not None else float('+inf')) for event in ev_list: buffer.write(repr(event) + '\n') - def write_to_file(self, filename): - with open(filename, 'w') as f: - self.write(f) - - def write_string(self): - s = StringIO() - self.write(s) - return s - def _parse_conditions(self, *conditions): """ Parses condition strings and returns the conditions diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index f3daefd..df80b7e 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -230,6 +230,8 @@ class Testbed(mod.Testbed): # End of shim/node parsing block ## # + for node in experiment.nodes: + node.has_tcpdump = True def executor(list_of_commands): for cmd in list_of_commands: diff --git a/tools/scriptgenerator.py b/tools/scriptgenerator.py index 059163e..cc3e1ea 100644 --- a/tools/scriptgenerator.py +++ b/tools/scriptgenerator.py @@ -45,7 +45,7 @@ def main(duration, exp, run=False, script='generated_script.txt'): f.write('################################################\n') f.write('# SCRIPT GENERATED WITH RUMBA SCRIPT GENERATOR #\n') f.write('################################################\n') - story.script.write(f) + story.write_script(f) if run: with ExperimentManager(exp, swap_out_strategy=PAUSE_SWAPOUT): |