aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/example-script.rsb4
-rwxr-xr-xexamples/script-example.py17
-rw-r--r--rumba/model.py12
-rw-r--r--rumba/ssh_support.py8
-rw-r--r--rumba/storyboard.py325
-rw-r--r--rumba/testbeds/qemu.py2
-rw-r--r--tools/scriptgenerator.py2
7 files changed, 272 insertions, 98 deletions
diff --git a/examples/example-script.rsb b/examples/example-script.rsb
index 8b5c714..f00ff42 100644
--- a/examples/example-script.rsb
+++ b/examples/example-script.rsb
@@ -33,7 +33,9 @@ echo2, 18 &ev4| $sb run_client_of $Server.server_b
1.2 &ev5| run_client_of $Server.server_c
# Events need _not_ be in temporal order
# if no object ($ handle) is provided, the storyboard
-# is assumed as the object/
+# is assumed as the object
+
+14 | $Node.node_a set_link_state $ShimEthDIF.e1 'up'
diff --git a/examples/script-example.py b/examples/script-example.py
index 316a1d1..6cfaf42 100755
--- a/examples/script-example.py
+++ b/examples/script-example.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# An example script leveraging the storyboard scripting functionality
+from functools import partial
from rumba.storyboard import *
from rumba.model import *
@@ -83,6 +84,11 @@ server_c = Server(
s_id='server_c'
)
+typewriter = Client(
+ "touch test_file",
+ shutdown=""
+)
+
if __name__ == '__main__':
story = StoryBoard(30)
@@ -93,7 +99,16 @@ if __name__ == '__main__':
client_a.add_node(node_b)
client_b.add_node(node_b)
client_c.add_node(node_b)
- story.parse_script_file('example-script.rsb')
+ script_file = os.path.join(
+ os.path.dirname(__file__),
+ 'example-script.rsb'
+ )
+ story.parse_script_file(script_file)
+ cb = partial(story.run_command, node_a, "ls -l test_file")
+ # This will return a file, because it will run after
+ # Triggering_event
+ action = partial(story.run_client, typewriter, 0.2, node=node_a, callback=cb)
+ story.add_event(Event(action, ev_time=12, ev_id='Triggering_event'))
log.flush_log()
with ExperimentManager(exp):
exp.swap_in()
diff --git a/rumba/model.py b/rumba/model.py
index 20f8215..d46cff9 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -145,6 +145,10 @@ class ShimUDPDIF(DIF):
# @link_speed [int] Speed of the Ethernet network, in Mbps
#
class ShimEthDIF(DIF):
+
+ def get_e_id(self):
+ return "ShimEthDIF." + self.name
+
def __init__(self, name, members=None, link_speed=0):
DIF.__init__(self, name, members)
self.link_speed = int(link_speed)
@@ -239,6 +243,7 @@ class Node(object):
self.ssh_config = SSHConfig(name)
self.ipcps = []
self.policies = dict()
+ self.has_tcpdump = False
if policies is None:
policies = dict()
for dif in self.difs:
@@ -358,9 +363,10 @@ class Node(object):
self.executor.fetch_files(self, paths, destination, sudo)
def fetch_file(self, path, destination, sudo=False):
- self.executor.fetch_files(self, path, destination, sudo)
+ self.executor.fetch_file(self, path, destination, sudo)
- def set_link_state(self, ipcp, state):
+ def set_link_state(self, dif, state):
+ ipcp = self.get_ipcp_by_dif(dif)
self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state,
as_root=True)
@@ -893,4 +899,4 @@ class Executor:
def fetch_files(self, node, paths, destination, sudo=False):
for path in paths:
- self.fetch_file(node, path, destination, sudo) \ No newline at end of file
+ self.fetch_file(node, path, destination, sudo)
diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py
index b1492e7..a9dff28 100644
--- a/rumba/ssh_support.py
+++ b/rumba/ssh_support.py
@@ -336,8 +336,10 @@ def copy_files_from_testbed(testbed, ssh_config, paths,
destination = destination + '/'
if sudo:
- execute_command(testbed, ssh_config,
- 'sudo chmod a+rw %s' % (" ".join(paths)))
+ cmd = 'chmod a+rw %s' % (" ".join(paths))
+ if ssh_config.username != 'root':
+ cmd = "sudo %s" % command
+ execute_command(testbed, ssh_config, cmd)
if ssh_config.client is None:
client, proxy_client = ssh_connect(ssh_config.hostname, ssh_config.port,
@@ -436,4 +438,4 @@ def aptitude_install(testbed, node, packages):
"while ! " + sudo("apt-get update") + "; do sleep 1; done",
"while ! " + sudo(package_install) + "; do sleep 1; done"]
- execute_proxy_commands(testbed, node.ssh_config, cmds, time_out=None) \ No newline at end of file
+ execute_proxy_commands(testbed, node.ssh_config, cmds, time_out=None)
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index edf4f0c..0dc4fba 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -34,6 +34,7 @@ import math
import os
import random
import time
+import uuid
import rumba.model as model
import rumba.ssh_support as ssh_support
@@ -82,7 +83,7 @@ class Client(SBEntity):
def __init__(self, ap, nodes=None, options=None,
shutdown="kill <pid>", c_id=None):
self.ap = ap
- e_id = c_id if c_id is not None else self.ap
+ e_id = c_id if c_id is not None else self.ap.replace(' ', '_')
super(Client, self).__init__(e_id)
self.startup = (ap + ((" " + options) if options is not None else ""))
if isinstance(nodes, model.Node):
@@ -99,7 +100,7 @@ class Client(SBEntity):
def process(self, duration, node=None, proc_id=None):
if proc_id is None:
- proc_id = "%s_%s" % (self.ap, self.get_id())
+ proc_id = "%s_%s" % (self.id, self.get_id())
if node is None:
if len(self.nodes) == 0:
raise ValueError('No nodes for client %s'
@@ -200,11 +201,19 @@ class ClientProcess(SBEntity):
# @nodes: Specific nodes to start this server on
#
class Server(SBEntity):
+
+ current_id = -1
+
+ @classmethod
+ def get_id(cls):
+ cls.current_id += 1
+ return cls.current_id
+
def __init__(self, ap, arrival_rate, mean_duration,
options=None, max_clients=float('inf'),
clients=None, nodes=None, min_duration=2, s_id=None):
self.ap = ap
- e_id = s_id if s_id is not None else self.ap
+ e_id = s_id if s_id is not None else self.ap.replace(' ', '_')
super(Server, self).__init__(e_id)
self.options = options if options is not None else ""
self.max_clients = max_clients
@@ -274,21 +283,16 @@ class Server(SBEntity):
def run(self):
for node in self.nodes:
- logfile = "/tmp/%s_server.log" % self.ap
- script = r'nohup "$@" > %s 2>&1 & echo "$!"' % (logfile,)
run_cmd = self.ap + (
(" " + self.options) if self.options is not None else ""
)
- cmd_1 = "echo '%s' > startup.sh && chmod a+x startup.sh" \
- % (script,)
- cmd_2 = "./startup.sh %s" % (run_cmd,)
+ cmd_2 = "./startup.sh %s %s" % ('server_' + self.id, run_cmd)
logger.debug(
- 'Starting server %s on node %s with logfile %s.',
- self.id, node.name, logfile
+ 'Starting server %s on node %s.',
+ self.id, node.name
)
try:
- node.execute_command(cmd_1)
- self.pids[node] = (node.execute_command(cmd_2))
+ self.pids[node] = node.execute_command(cmd_2)
except ssh_support.SSHException:
logger.warn('Could not start server %s on node %s.',
self.id, node.name)
@@ -338,9 +342,12 @@ class StoryBoard(SBEntity):
self.active_clients = self.process_dict.values()
self.start_time = None
self.commands_list = {}
- self.script = script
self.node_map = {}
+ self.shims = {}
self._build_nodes_lists()
+ # The following must be last, because it needs the info from
+ # _build_nodes_list
+ self._script = script if script is not None else _Script(self)
def _build_nodes_lists(self):
"""Populates server_nodes and client_nodes lists"""
@@ -353,6 +360,9 @@ class StoryBoard(SBEntity):
if self.experiment is not None:
for node in self.experiment.nodes:
self.node_map[node.name] = node
+ for dif in self.experiment.dif_ordering:
+ if isinstance(dif, model.ShimEthDIF):
+ self.shims[dif.name] = dif
def _validate_and_add_server(self, s, n=None):
if len(s.clients) == 0:
@@ -373,6 +383,11 @@ class StoryBoard(SBEntity):
self._build_nodes_lists()
def set_experiment(self, experiment):
+ """
+ Set the storyboard's underlying experiment instance
+
+ @param experiment: the experiment instance
+ """
if not isinstance(experiment, model.Experiment):
raise TypeError('Experiment instance required.')
self.experiment = experiment
@@ -391,6 +406,9 @@ class StoryBoard(SBEntity):
"""
Utility method to simultaneously add a server to a sb
and a node to the server.
+
+ @param server: the server to be added to the storyboard
+ @param node: the node upon which the server should run
"""
if self.experiment is None:
raise ValueError("Cannot add a server before "
@@ -416,8 +434,8 @@ class StoryBoard(SBEntity):
"""
if self.experiment is None:
raise ValueError("An experiment is needed to schedule commands.")
- if self.script is None:
- self.script = Script(self)
+ if self._script is None:
+ self._script = _Script(self)
if isinstance(node, str):
node = self.node_map[node]
if node not in self.experiment.nodes:
@@ -426,7 +444,7 @@ class StoryBoard(SBEntity):
if isinstance(command, str):
command = [command]
action = functools.partial(self.run_command, node, command)
- self.script.add_event(Event(action, ev_time=t))
+ self._script.add_event(Event(action, ev_time=t))
def run_command(self, node, command):
"""
@@ -446,7 +464,31 @@ class StoryBoard(SBEntity):
command = [command]
node.execute_commands(command)
- def run_client_of(self, server, duration=None, node=None, proc_id=None):
+ def add_event(self, event):
+ """
+ Add an event to this script, provided either as an Event
+ instance or as a string as read from a .rsb script.
+
+ :param event: the event to add
+ :type event: (Event or str)
+ """
+ self._script.add_event(event)
+
+ def del_event(self, event):
+ """
+ Remove an event from this storyboard
+
+ :param event: the event (or id thereof) to remove
+ :type event: (Event or str)
+ """
+ self._script.del_event(event)
+
+ def run_client_of(self,
+ server,
+ duration=None,
+ node=None,
+ proc_id=None,
+ callback=None):
"""
Runs a random client of the specified server
with the specified parameters.
@@ -459,15 +501,22 @@ class StoryBoard(SBEntity):
@param duration: the duration of the client process
@param node: (Node or str) the node on which the client should be run
@param proc_id: the entity ID to use for the process
+ @param callback: callable or list thereof to be run
+ after client termination
"""
if isinstance(server, str):
server = self.server_apps[server]
if duration is None:
duration = server.get_duration()
client = random.choice(server.clients)
- self.run_client(client, duration, node, proc_id)
-
- def run_client(self, client, duration, node=None, proc_id=None):
+ self.run_client(client, duration, node, proc_id, callback)
+
+ def run_client(self,
+ client,
+ duration,
+ node=None,
+ proc_id=None,
+ callback=None):
"""
Runs the specified client app with the specified parameters.
@@ -478,6 +527,8 @@ class StoryBoard(SBEntity):
@param duration: the duration of the client process
@param node: (Node or str) the node on which the client should be run
@param proc_id: the entity ID to use for the process
+ @param callback: callable or list thereof to be run
+ after client termination
"""
if isinstance(client, str):
client = self.client_apps[client]
@@ -488,11 +539,19 @@ class StoryBoard(SBEntity):
node = random.choice(client.nodes)
elif isinstance(node, str):
node = self.node_map[node]
+ if callback is None:
+ callback = []
+ elif not hasattr(callback, '__len__'):
+ callback = [callback]
process = client.process(duration, node, proc_id)
self.process_dict[process.id] = process
process.run()
action = functools.partial(self.kill_process, process.id)
- self.script.add_event(Event(action, ev_time=(self.cur_time + duration)))
+ term_ev = Event(action, ev_time=(self.cur_time + duration))
+ self.add_event(term_ev)
+ for cb in callback:
+ cb_event = Event(action=cb, trigger=term_ev)
+ self.add_event(cb_event)
def start_client_of(self, server, duration=None, node=None, proc_id=None):
"""
@@ -503,10 +562,10 @@ class StoryBoard(SBEntity):
is not specified, it will be randomly generated according
to the server parameters (client apps and their nodes).
- If the client app must be shutdown manually or the duration
- parameter is None, the client process will _not_ be stopped
- automatically, and will continue running unless otherwise
- killed.
+ Note that this method, as opposed to
+ :meth:`rumba.storyboard.run_client_of`, will not generate an event
+ to stop the client after the duration is expired. In most cases,
+ :meth:`rumba.storyboard.run_client_of` is the way to go.
@param server: the server of which one client should be run
@param duration: the duration of the client process
@@ -525,10 +584,10 @@ class StoryBoard(SBEntity):
If the node parameter is not given, it will be chosen at random
among the client default nodes.
- If the app must be shutdown manually or the duration
- parameter is None, it will _not_ be stopped
- automatically, and will continue running unless otherwise
- killed.
+ Note that this method, as opposed to
+ :meth:`rumba.storyboard.run_client`, will not generate an event
+ to stop the client after the duration is expired. In most cases,
+ :meth:`rumba.storyboard.run_client` is the way to go.
@param client: the client which should be run
@param duration: the duration of the client process
@@ -551,21 +610,21 @@ class StoryBoard(SBEntity):
del self.process_dict[proc_id]
def periodic_check(self, t):
- self.script.check_for_ready_ev(t)
- self.script.run_ready()
-
- def parse_script(self, buffer):
- self.script = Script(self)
- self.script.parse(buffer)
+ self._script.check_for_ready_ev(t)
+ self._script.run_ready()
- def parse_script_file(self, filename):
- self.script = Script(self)
- self.script.parse_file(filename)
+ def generate_script(self, clean=True):
+ """
+ Randomly generate a script for this experiment based on the
+ parameters provided to the instances of servers, nodes and clients.
- def generate_script(self):
+ @param clean: if True, discard the current script before
+ generating a new one.
+ """
if self.experiment is None:
raise ValueError('Cannot generate script without an experiment')
- self.script = Script(self)
+ if clean:
+ self._script = _Script(self)
t = self.SCRIPT_RESOLUTION
marker = 5
last_marker = 0
@@ -585,7 +644,7 @@ class StoryBoard(SBEntity):
p,
c
)
- self.script.add_event(start)
+ self._script.add_event(start)
t += self.SCRIPT_RESOLUTION
def _make_process_events(self, t, d, n, p, c):
@@ -602,14 +661,12 @@ class StoryBoard(SBEntity):
def start(self):
if self.experiment is None:
raise ValueError("Cannot run sb with no experiment.")
- if self.script is None:
+ if self._script is None:
self.generate_script()
logger.info('Starting storyboard execution')
self._build_nodes_lists()
logger.debug('Server nodes are: %s.',
[x.name for x in self.server_nodes])
- logger.debug('Client nodes are: %s.',
- [x.name for x in self.client_nodes])
logger.debug('Command list is: %s.', {x: [(y.name, z)
for y, z in t]
for (x, t)
@@ -617,8 +674,8 @@ class StoryBoard(SBEntity):
self.start_time = time.time()
script = r'logname="$1"; shift; nohup "${@}" ' \
r'> /tmp/${logname}.rumba.log 2>&1 & echo "$!"'
- logger.debug("Writing utility startup script on client nodes.")
- for node in self.client_nodes:
+ logger.debug("Writing utility startup script on nodes.")
+ for node in self.node_map.values():
node.execute_command(
"echo '%s' > startup.sh && chmod a+x startup.sh" % (script,)
)
@@ -674,12 +731,12 @@ class StoryBoard(SBEntity):
if not isinstance(dif, model.ShimEthDIF):
raise Exception("Not a Shim Ethernet DIF.")
- if self.script is None:
- self.script = Script(self)
+ if self._script is None:
+ self._script = _Script(self)
- for ipcp in dif.ipcps:
- action = functools.partial(ipcp.node.set_link_state, ipcp, state)
- self.script.add_event(Event(action, ev_time=t))
+ for node in dif.members:
+ action = functools.partial(node.set_link_state, dif, state)
+ self._script.add_event(Event(action, ev_time=t))
def set_link_up(self, t, dif):
self.set_link_state(t, dif, 'up')
@@ -691,14 +748,14 @@ class StoryBoard(SBEntity):
if self.experiment is None:
raise ValueError("An experiment is needed to schedule commands.")
- if self.script is None:
- self.script = Script(self)
+ if self._script is None:
+ self._script = _Script(self)
- for ipcp in node.ipcps:
- if not isinstance(ipcp, model.ShimEthIPCP):
+ for dif in node.difs:
+ if not isinstance(dif, model.ShimEthDIF):
continue
- action = functools.partial(ipcp.node.set_link_state, ipcp, state)
- self.script.add_event(Event(action, ev_time=t))
+ action = functools.partial(node.set_link_state, dif, state)
+ self._script.add_event(Event(action, ev_time=t))
def set_node_up(self, t, node):
self.set_node_state(t, node, 'up')
@@ -706,6 +763,105 @@ class StoryBoard(SBEntity):
def set_node_down(self, t, node):
self.set_node_state(t, node, 'down')
+ def write_script(self, buffer):
+ """
+ Writes the script on a string buffer, at the current position
+
+ @param buffer: a string buffer.
+ """
+ self._script.write(buffer)
+
+ def write_script_to_file(self, filename, clean=True):
+ """
+ Writes the script to a file, overwriting content.
+
+ @param filename: the name of the destination file
+ @param clean: if True, current file contents will be overwritten.
+ """
+ mode = 'w'
+ if not clean:
+ mode += '+'
+ with open(filename, mode) as f:
+ self.write_script(f)
+
+ def write_script_string(self):
+ """
+ Writes the script into a string and returns it.
+
+ @return: the script as a string.
+ """
+ s = StringIO()
+ self.write_script(s)
+ return s
+
+ def parse_script(self, buffer, clean=True):
+ """
+ Reads a script from a buffer, at the current position.
+
+ @param buffer: the buffer to read from.
+ @param clean: if True, discard the current script before reading.
+ """
+ if clean:
+ self._script = _Script(self)
+ self._script.parse(buffer)
+
+ def parse_script_file(self, filename, clean=True):
+ """
+ Reads a script from a file.
+
+ @param filename: the file to read from.
+ @param clean: if True, discard the current script before reading.
+ """
+ if clean:
+ self._script = _Script(self)
+ with open(filename, 'r') as f:
+ self.parse_script(f, clean)
+
+ def parse_script_string(self, string, clean=True):
+ """
+ Reads a script from a string.
+
+ @param string: the string to read from.
+ @param clean: if True, discard the current script before reading.
+ """
+ if clean:
+ self._script = _Script(self)
+ buffer = StringIO(string)
+ self.parse_script(buffer, clean)
+
+ def capture_traffic(self, start, end, node, dif):
+ """
+ Captures the traffic of an interface on a certain node.
+
+ :param start: The time to start capturing.
+ :param end: The time to stop capturing.
+ :param node: The node to capture on.
+ :param dif: The Shim Ethernet DIF of the node, Rumba
+ automatically resolves the correct interface.
+ """
+ for ipcp in dif.ipcps:
+ if ipcp.node is not node:
+ continue
+ # In case tcpdump is not present, this assumes a testbed
+ # with Ubuntu/Debian just like the rest of installation
+ if not node.has_tcpdump:
+ ssh_support.aptitude_install(self.experiment.testbed,
+ node, ["tcpdump"])
+ node.has_tcpdump = True
+
+ # Create random string
+ pcap_file = node.name + '_' + dif.name + '_' + \
+ str(uuid.uuid4())[0:4] + ".pcap"
+
+ tcpd_client = Client(ap="tcpdump", options="-i %s -w %s" \
+ % (ipcp.ifname, pcap_file))
+ duration = end - start
+ cb = functools.partial(node.fetch_file, pcap_file,
+ self.experiment.log_dir, sudo=True)
+ action = functools.partial(self.run_client, tcpd_client,
+ duration=duration, node=node,
+ callback = cb)
+ self._script.add_event(Event(action, ev_time=start))
class Event(object):
@@ -724,10 +880,10 @@ class Event(object):
@param ev_time: (float) seconds to wait before running the event
@param trigger: (Event) Event which must complete before this runs
"""
+ self.id = ev_id if ev_id is not None else self.generate_id()
if ev_time is None and trigger is None:
raise ValueError('No condition specified for event %s.' %
- ev_id)
- self.id = ev_id if ev_id is not None else self.generate_id()
+ self.id)
self.action = action
self.time = ev_time
self._trigger = trigger
@@ -821,12 +977,11 @@ class Event(object):
return self._repr
-class Script(object):
+class _Script(object):
def __init__(self, storyboard):
- # Brute force 'dump all in memory' approach to avoid
- # iterating through the events a lot of times
- # at runtime
+ if storyboard is None:
+ raise ValueError("storyboard must not be None")
self.events_by_id = {}
self._waiting_events = {}
self._events_ready = []
@@ -834,20 +989,28 @@ class Script(object):
self._nodes = {}
self._servers = {}
self._clients = {}
- self._testbed = None
- self._experiment = None
- self._storyboard = None
+ self._storyboard = storyboard
self._entities = {}
- self._parse_entities(storyboard)
+ self._parse_entities()
- def _parse_entities(self, storyboard):
- self._storyboard = storyboard
- self._experiment = self._storyboard.experiment
+ @property
+ def _experiment(self):
+ return self._storyboard.experiment
+
+ @property
+ def _testbed(self):
+ exp = self._experiment
+ if exp is None:
+ return None
+ else:
+ return exp.testbed
+
+ def _parse_entities(self):
self._nodes = self._storyboard.node_map
- self._testbed = self._experiment.testbed
self._servers = self._storyboard.server_apps
self._clients = self._storyboard.client_apps
self._processes = {}
+ self._shims = self._storyboard.shims
self._entities = {
'sb': self._storyboard,
'storyboard': self._storyboard,
@@ -856,7 +1019,8 @@ class Script(object):
'Node': self._nodes,
'Server': self._servers,
'Client': self._clients,
- 'ClientProcess': self._processes
+ 'ClientProcess': self._processes,
+ 'ShimEthDIF': self._shims
}
def add_process(self, process):
@@ -1015,29 +1179,12 @@ class Script(object):
except ValueError as e:
raise ValueError(str(e) + ' -> @ line %s' % (index,))
- def parse_file(self, filename):
- with open(filename, 'r') as f:
- self.parse(f)
-
- def parse_string(self, s):
- buffer = StringIO(s)
- self.parse(buffer)
-
def write(self, buffer):
ev_list = list(self.events_by_id.values())
ev_list.sort(key=lambda x: x.time if x.time is not None else float('+inf'))
for event in ev_list:
buffer.write(repr(event) + '\n')
- def write_to_file(self, filename):
- with open(filename, 'w') as f:
- self.write(f)
-
- def write_string(self):
- s = StringIO()
- self.write(s)
- return s
-
def _parse_conditions(self, *conditions):
"""
Parses condition strings and returns the conditions
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index f3daefd..df80b7e 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -230,6 +230,8 @@ class Testbed(mod.Testbed):
# End of shim/node parsing block
##
#
+ for node in experiment.nodes:
+ node.has_tcpdump = True
def executor(list_of_commands):
for cmd in list_of_commands:
diff --git a/tools/scriptgenerator.py b/tools/scriptgenerator.py
index 059163e..cc3e1ea 100644
--- a/tools/scriptgenerator.py
+++ b/tools/scriptgenerator.py
@@ -45,7 +45,7 @@ def main(duration, exp, run=False, script='generated_script.txt'):
f.write('################################################\n')
f.write('# SCRIPT GENERATED WITH RUMBA SCRIPT GENERATOR #\n')
f.write('################################################\n')
- story.script.write(f)
+ story.write_script(f)
if run:
with ExperimentManager(exp, swap_out_strategy=PAUSE_SWAPOUT):