aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/log.rst2
-rw-r--r--doc/utils.rst2
-rw-r--r--doc/workflow.rst6
-rw-r--r--rumba/log.py70
-rw-r--r--rumba/model.py48
-rw-r--r--rumba/storyboard.py602
-rw-r--r--rumba/utils.py22
7 files changed, 550 insertions, 202 deletions
diff --git a/doc/log.rst b/doc/log.rst
index 1fd3b6f..d19bc13 100644
--- a/doc/log.rst
+++ b/doc/log.rst
@@ -8,3 +8,5 @@ Logging
.. autofunction:: rumba.log.flush_and_kill_logging
.. autofunction:: rumba.log.flush_log
+
+.. autoclass:: rumba.log.LogOptions
diff --git a/doc/utils.rst b/doc/utils.rst
index 6df8a0c..cc3db41 100644
--- a/doc/utils.rst
+++ b/doc/utils.rst
@@ -8,6 +8,8 @@ Example usage of the class:
.. code-block:: python
+ from rumba.utils import ExperimentManager, PROMPT_SWAPOUT
+
with ExperimentManager(exp, swap_out_strategy=PROMPT_SWAPOUT):
exp.swap_in()
exp.bootstrap_prototype()
diff --git a/doc/workflow.rst b/doc/workflow.rst
index 04f23b1..672c33c 100644
--- a/doc/workflow.rst
+++ b/doc/workflow.rst
@@ -34,11 +34,9 @@ Workflow
Accessing nodes after swap-in
-----------------------------
-To access a node once the experiment swapped in, use the following
+To access a node once the experiment is swapped in, use the following
command (in the same terminal where ssh-agent was run in case of jFed): ::
$ rumba-access $NODE_NAME
-Where $NODE_NAME is the name of the node to access. In case of the
-QEMU testbed, the password of the downloaded buildroot images is
-'root'.
+Where $NODE_NAME is the name of the node to access. \ No newline at end of file
diff --git a/rumba/log.py b/rumba/log.py
index 67070a6..2ee871c 100644
--- a/rumba/log.py
+++ b/rumba/log.py
@@ -170,7 +170,7 @@ except ImportError:
class RumbaFormatter(logging.Formatter):
"""
- The logging.Formatter subclass used by Rumba
+ The `logging.Formatter` subclass used by Rumba
"""
level_name_table = {
@@ -178,7 +178,14 @@ class RumbaFormatter(logging.Formatter):
'ERROR': 'ERR',
'WARNING': 'WRN',
'INFO': 'INF',
- 'DEBUG': 'DBG'
+ 'DEBUG': 'DBG',
+ # handlers beyond the first will get the
+ # modified levelname
+ 'CRT': 'CRT',
+ 'ERR': 'ERR',
+ 'WRN': 'WRN',
+ 'INF': 'INF',
+ 'DBG': 'DBG'
}
def __init__(self):
@@ -202,15 +209,6 @@ def setup():
logging.getLogger('').setLevel(logging.ERROR)
logging.getLogger('rumba').setLevel(logging.INFO)
- handler = logging.StreamHandler(sys.stdout)
- handler.setLevel(logging.DEBUG)
- formatter = RumbaFormatter()
- handler.setFormatter(formatter)
- listener = QueueListener(mq, handler)
- global logging_listener
- logging_listener = listener
- listener.start()
-
# Used for the first call, in order to configure logging
def _get_logger_with_setup(name):
@@ -231,11 +229,14 @@ _get_logger = _get_logger_with_setup
def get_logger(name):
"""
- Returns the logger named <name>.
- <name> should be the module name, for consistency. If setup has not been
- called yet, it will call it first.
+ Returns the logger named `name`.
+
+ `name` should be the module name, for consistency.
+
+ If setup has not been called yet, it will call it first.
:param name: the name of the desired logger
+ :type name: `str`
:return: The logger
"""
return _get_logger(name)
@@ -243,11 +244,13 @@ def get_logger(name):
def set_logging_level(level, name=None):
"""
- Set the current logging level to <level> for logger named <name>.
- If name is not specified, sets the logging level for all rumba loggers.
+ Set the current logging level to `level` for the logger named `name`.
+ If `name` is not specified, sets the logging level for all rumba loggers.
- :param level: the desired logging level.
+ :param level: the desired logging level, in string or int form.
+ :type level: `str` or `int`
:param name: The name of the logger to configure
+ :type name: `str`
.. note:: Accepted levels are:
@@ -274,8 +277,8 @@ def set_logging_level(level, name=None):
def reset_logging_level():
"""
- Resets the current logging levels to the defaults. For Rumba the
- default is INFO.
+ Resets the current logging level of all loggers to the default.
+ For the Rumba library the default is INFO.
"""
# Un-sets every logger previously set
for logger in loggers_set:
@@ -303,6 +306,15 @@ def flush_and_kill_logging():
class LogOptions(object):
+ """Class holding the logging configuration"""
+
+ def __init__(self):
+ global logging_listener
+ global mq
+
+ logging_listener = QueueListener(mq)
+ self.log_to_console()
+ logging_listener.start()
@staticmethod
def _get_handlers():
@@ -316,19 +328,39 @@ class LogOptions(object):
def _add_handler(self, handler):
handler.setFormatter(RumbaFormatter())
+ handler.setLevel(DEBUG)
handlers = self._get_handlers() + (handler,)
self._set_handlers(*handlers)
return self
def log_to_file(self, path='rumba.log'):
+ """
+ Set the logging framework to log to file on top of the other
+ logging facilities.
+
+ :param path: logging file filename
+ :type path: `str`
+ :return: this `.LogOptions` instance
+ """
new_handler = logging.handlers.RotatingFileHandler(path)
return self._add_handler(new_handler)
def reset_logging(self):
+ """
+ Disable all logging facilities
+
+ :return: this `.LogOptions` instance
+ """
self._set_handlers(*tuple())
return self
def log_to_console(self):
+ """
+ Set the logging framework to log to stdout on top of the
+ other configured logging facilities
+
+ :return: this `.LogOptions` instance
+ """
new_handler = logging.StreamHandler(sys.stdout)
return self._add_handler(new_handler)
diff --git a/rumba/model.py b/rumba/model.py
index f9a0d3c..a6925b3 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -59,7 +59,6 @@ except OSError:
pass
-
class Testbed(object):
"""
Base class for every testbed plugin.
@@ -124,6 +123,7 @@ class Testbed(object):
def _swap_out(self, experiment):
logger.info("swap_out(): nothing to do")
+
class DIF(object):
"""
Base class for DIFs.
@@ -231,7 +231,7 @@ class ShimEthDIF(DIF):
distribution=None):
"""
Set the delay parameters of the underlying link.
- Parameters as in :py:meth:`.Delay.__init__`
+ Parameters as in :py:class:`.Delay`
:param delay: average delay in ms
:type delay: :py:class:`int`
@@ -252,7 +252,7 @@ class ShimEthDIF(DIF):
correlation=None):
"""
Set the loss parameter of the underlying link.
- Parameters as in :py:meth:`.Loss.__init__`
+ Parameters as in :py:class:`.Loss`
:param loss: loss in percentage
:type loss: :py:class:`int` or :py:class:`float`
@@ -321,11 +321,11 @@ class NormalDIF(DIF):
:param comp: Component name.
:param pol: Policy name
"""
- self.policy.del_policy(comp, policy_name)
+ self.policy.del_policy(comp, pol)
def show(self):
"""
- :return: A string of the policies in the DIF.
+ :return: A string representing the policies in the DIF.
"""
s = DIF.__repr__(self)
for comp, pol_dict in self.policy.get_policies().items():
@@ -338,6 +338,14 @@ class NormalDIF(DIF):
class Distribution(Enum):
"""
An enum holding different statistical distributions.
+
+ **Values:**
+
+ `NORMAL = 1`
+
+ `PARETO = 2`
+
+ `PARETONORMAL = 3`
"""
NORMAL = 1
PARETO = 2
@@ -463,9 +471,9 @@ class LinkQuality(object):
Clone old_quality, updating it with the provided parameters
if present.
- :param old_quality: A :py:class`.LinkQuality` instance to
+ :param old_quality: A :py:class:`.LinkQuality` instance to
use as a base
- :type old_quality: :py:class`.LinkQuality`
+ :type old_quality: :py:class:`.LinkQuality`
:param delay: Delay object holding delay configuration
or number corresponding to delay in ms
:type delay: :py:class:`.Delay` or :py:class:`int`
@@ -474,8 +482,8 @@ class LinkQuality(object):
:type loss: :py:class:`.Loss` or :py:class:`float`
:param rate: The rate of the link in mbit
:type rate: :py:class:`int`
- :return: a new :py:class`.LinkQuality` instance.
- :rtype: py:class`LinkQuality`
+ :return: a new :py:class:`.LinkQuality` instance.
+ :rtype: :py:class:`.LinkQuality`
"""
if delay is None:
delay = old_quality.delay
@@ -568,6 +576,7 @@ class LinkQuality(object):
"netem" % ipcp.ifname, as_root=True)
LinkQuality._active.remove(ipcp)
+
class SSHConfig(object):
def __init__(self, hostname, port=22, proxy_server=None):
self.username = None
@@ -588,6 +597,7 @@ class SSHConfig(object):
def set_http_proxy(self, proxy):
self.http_proxy = proxy
+
class Node(object):
"""
A node in the experiment.
@@ -624,8 +634,8 @@ class Node(object):
if hasattr(dif, 'policy'): # check if the dif supports policies
self.policies[dif] = policies.get(dif, Policy(dif, self))
- self.executor = None # will be set by testbed on swap_in
- self.startup_command = None # will be set by prototype
+ self.executor = None # will be set by testbed on swap_in
+ self.startup_command = None # will be set by prototype
self._validate()
@@ -742,6 +752,7 @@ class Node(object):
"""
Removes a policy.
+ :param dif: the dif to which the policy should be applied
:param component_name: Name of the component.
:param policy_name: Name of the policy.
"""
@@ -764,7 +775,10 @@ class Node(object):
:param time_out: Seconds before timing out.
:param use_proxy: Use a proxy to execute the commands?
"""
- return self.executor.execute_commands(self, commands, as_root, time_out)
+ return self.executor.execute_commands(self,
+ commands,
+ as_root,
+ time_out)
def execute_command(self, command, as_root=False, time_out=3,
use_proxy=False):
@@ -777,7 +791,10 @@ class Node(object):
:param use_proxy: Use a proxy to execute the commands?
:return: The stdout of the command.
"""
- return self.executor.execute_command(self, command, as_root, time_out)
+ return self.executor.execute_command(self,
+ command,
+ as_root,
+ time_out)
def copy_file(self, path, destination):
"""
@@ -801,7 +818,7 @@ class Node(object):
"""
Fetch file from the node.
- :param paths: Location of the files on the node.
+ :param path: Location of the files on the node.
:param destination: Destination location of the files.
:param sudo: The file is owned by root on the node?
"""
@@ -828,6 +845,7 @@ class Node(object):
self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state,
as_root=True)
+
class IPCP(object):
def __init__(self, name, node, dif):
self.name = name
@@ -868,6 +886,7 @@ class ShimUDPIPCP(IPCP):
IPCP.__init__(self, name, node, dif)
# TODO: add IP and port
+
class Policy(object):
def __init__(self, dif, node=None, policies=None):
self.dif = dif # type: NormalDIF
@@ -940,6 +959,7 @@ class Policy(object):
s += "\n]\n"
return s
+
class Experiment(object):
"""
Base class for experiments.
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index 2de21ff..de2bc62 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -62,7 +62,7 @@ except ImportError:
# But we might not care
-class SBEntity(object):
+class _SBEntity(object):
def __init__(self, e_id):
self.id = e_id
@@ -71,17 +71,37 @@ class SBEntity(object):
return type(self).__name__ + '.' + self.id
-class Client(SBEntity):
+class Client(_SBEntity):
+ """
+ Class representing a client application running in the experiment.
+
+ A list of "client nodes" can be specified: if so, when generating a
+ random script, this Client will be run only on those nodes.
+ """
current_id = -1
@classmethod
- def get_id(cls):
+ def _get_id(cls):
cls.current_id += 1
return cls.current_id
def __init__(self, ap, nodes=None, options=None,
shutdown="kill <pid>", c_id=None):
+ """
+ :param ap: the application binary/command to be run
+ :type ap: `str`
+ :param nodes: the list of nodes on which the client should be run
+ :type nodes: `:py:class:`rumba.model.Node` or `list` thereof
+ :param options: the options to be passed to the binary or command
+ :type options: `str`
+ :param shutdown: the command to be run in order to stop the client.
+ The token "<pid>" will be changed into the process'
+ pid.
+ :type shutdown: `str`
+ :param c_id: the ID used to reference to this instance
+ :type c_id: `str`
+ """
self.ap = ap
e_id = c_id if c_id is not None else self.ap.replace(' ', '_')
super(Client, self).__init__(e_id)
@@ -94,13 +114,35 @@ class Client(SBEntity):
self.shutdown = shutdown
def add_node(self, node):
+ """
+ Add a node to this instance's list.
+
+ :param node: the node to add
+ :type node: `rumba.model.Node`
+ """
if not isinstance(node, model.Node):
raise Exception("A Node is required.")
self.nodes.append(node)
def process(self, duration, node=None, proc_id=None):
+ """
+ Generates a `.ClientProcess` of this application.
+
+ :param duration: the duration of the process. It will be
+ passed to the process call in stead of the
+ <duration> token.
+ :type duration: `float`
+ :param node: the node on which the process should be run.
+ If `None`, a random node from this client's list
+ will be picked.
+ :type node: `rumba.model.Node`
+ :param proc_id: the ID used to reference to the generated process
+ :type proc_id: `str`
+ :return: the generated process
+ :rtype: `.ClientProcess`
+ """
if proc_id is None:
- proc_id = "%s_%s" % (self.id, self.get_id())
+ proc_id = "%s_%s" % (self.id, self._get_id())
if node is None:
if len(self.nodes) == 0:
raise ValueError('No nodes for client %s'
@@ -108,7 +150,7 @@ class Client(SBEntity):
node = random.choice(self.nodes)
return ClientProcess(
proc_id,
- self.ap,
+ self.id,
self.startup,
duration,
node,
@@ -116,18 +158,29 @@ class Client(SBEntity):
)
-# Base class for client processes
-#
-# @ap: Application Process binary
-# @duration: The time (in seconds) this process should run
-# @start_time: The time at which this process is started.
-# @options: Options to pass to the binary
-#
-class ClientProcess(SBEntity):
- def __init__(self, proc_id, ap, startup, duration,
+class ClientProcess(_SBEntity):
+ """Class representing a running client application process on a node"""
+ def __init__(self, proc_id, ap_id, startup, duration,
node, shutdown="kill <pid>"):
+ """
+
+ :param proc_id: the ID used to identify this instance
+ :type proc_id: `str`
+ :param ap_id: the ID of the client app that generated this process
+ :type ap_id: `str`
+ :param startup: the full command used to start this process
+ :type startup: `str`
+ :param duration: the intended duration of this process. It will also
+ replace the "<duration>" token in the
+ `startup` parameter
+ :type duration: `int` od `float`
+ :param node: the node on which this process runs
+ :type node: `rumba.model.Node`
+ :param shutdown: the command used to stop this process
+ :type shutdown: `str`
+ """
super(ClientProcess, self).__init__(proc_id)
- self.ap = ap
+ self.ap_id = ap_id
self.startup = startup
self.duration = duration if duration is not None else -1
self.start_time = None
@@ -137,13 +190,14 @@ class ClientProcess(SBEntity):
self.shutdown = shutdown
def run(self):
+ """Starts this process"""
if self.node is None:
- raise Exception('No node specified for client %s' % (self.ap,))
+ raise Exception('No node specified for client %s' % (self.ap_id,))
self.start_time = time.time()
logger.debug(
'Starting client app %s on node %s with duration %s.',
- self.ap, self.node.name, self.duration
+ self.ap_id, self.node.name, self.duration
)
start_cmd = "./startup.sh %s %s" % (
@@ -155,52 +209,32 @@ class ClientProcess(SBEntity):
self.pid = self.node.execute_command(start_cmd)
except ssh_support.SSHException:
logger.warning('Could not start client %s on node %s.',
- self.ap, self.node.name)
+ self.ap_id, self.node.name)
logger.debug('Client app %s on node %s got pid %s.',
- self.ap, self.node.name, self.pid)
+ self.ap_id, self.node.name, self.pid)
def stop(self):
+ """Stops this process"""
if self.shutdown != "":
logger.debug(
'Killing client %s on node %s.',
- self.ap, self.node.name
+ self.ap_id, self.node.name
)
try:
kill_cmd = self.shutdown.replace('<pid>', str(self.pid))
self.node.execute_command(kill_cmd)
except ssh_support.SSHException:
logger.warn('Could not kill client %s on node %s.',
- self.ap, self.node.name)
+ self.ap_id, self.node.name)
else:
logger.debug(
'Client %s on node %s has terminated.',
- self.ap, self.node.name
+ self.ap_id, self.node.name
)
- def check_and_kill(self):
- """Check if the process should keep running, stop it if not,
- and return true if and only if it is still running."""
- now = time.time()
- if not self.running:
- return False
- if now - self.start_time >= self.duration:
- self.stop()
- self.running = False
- return False
- return True
-
-# Base class for server programs
-#
-# @ap: Application Process binary
-# @arrival_rate: Average requests/s to be received by this server
-# @mean_duration: Average duration of a client connection (in seconds)
-# @options: Options to pass to the binary
-# @max_clients: Maximum number of clients to serve
-# @clients: Client binaries that will use this server
-# @nodes: Specific nodes to start this server on
-#
-class Server(SBEntity):
+class Server(_SBEntity):
+ """Class representing a server app running in the experiment"""
current_id = -1
@@ -212,6 +246,33 @@ class Server(SBEntity):
def __init__(self, ap, arrival_rate, mean_duration,
options=None, max_clients=float('inf'),
clients=None, nodes=None, min_duration=2, s_id=None):
+ """
+
+ :param ap: the application binary or command which should be run
+ :type ap: `str`
+ :param arrival_rate:
+ :type arrival_rate: `float`
+ :param mean_duration: the required average lifetime of a client
+ of this server
+ :type mean_duration: `float`
+ :param options: the options to be passed to the binary/command
+ starting this server
+ :type options: `str`
+ :param max_clients: the maximum number of simultaneous clients
+ which can be served by this application
+ :type max_clients: `int`
+ :param clients: the clients applications which will request to
+ be served by this application
+ :type clients: `list` of `.Client`
+ :param nodes: the list of nodes this server application
+ should be run on
+ :type nodes: `list` of `rumba.model.Node`
+ :param min_duration: the minimum lifetime of a client of this
+ server
+ :type min_duration: `float`
+ :param s_id: the ID used to identify this instance
+ :type s_id: `str`
+ """
self.ap = ap
e_id = s_id if s_id is not None else self.ap.replace(' ', '_')
super(Server, self).__init__(e_id)
@@ -230,30 +291,61 @@ class Server(SBEntity):
self.min_duration = min_duration
def add_client(self, client):
+ """
+ Adds a client to this server's list
+
+ :param client: the client to add
+ :type client: `.Client`
+ """
self.clients.append(client)
def del_client(self, client):
+ """
+ Removes a client from this server's list
+
+ :param client: the client to remove
+ :type client: `.Client`
+ """
self.clients.remove(client)
def add_node(self, node):
+ """
+ Adds a node to this server's list
+
+ :param node: the node to add
+ :type node: `rumba.model.Node`
+ """
self.nodes.append(node)
def del_node(self, node):
+ """
+ Removes a node from this server's list
+
+ :param node: the node to remove
+ :type node: `rumba.model.Node`
+ """
self.nodes.remove(node)
- def get_new_clients(self, interval):
+ def _get_new_clients(self, interval):
"""
Returns a list of clients of size appropriate to the server's rate.
- The list's size should be a sample from Poisson(arrival_rate) over
+ The list's size should be a sample ~ Poisson(arrival_rate) over
interval seconds.
Hence, the average size should be interval * arrival_rate.
+
+ :param interval: the time increment for which new clients should be
+ generated
+ :type interval: `float`
+
+ :return: the list of new clients to be started
+ :rtype: `list` of `(duration, node, proc_id, client)` tuples
"""
number = _poisson(self.arrival_rate * interval)
number = int(min(number, self.max_clients))
return [self._make_process_arguments() for _ in range(number)]
- def get_duration(self):
+ def _get_duration(self):
return _exponential(self.actual_parameter) + self.min_duration
def _make_process_arguments(self, duration=None, node=None,
@@ -261,18 +353,36 @@ class Server(SBEntity):
if len(self.clients) == 0:
raise Exception("Server %s has empty client list." % (self,))
if duration is None:
- duration = self.get_duration()
+ duration = self._get_duration()
if client is None:
client = random.choice(self.clients)
if node is None:
node = random.choice(client.nodes)
if proc_id is None:
- proc_id = "%s_%s" % (client.ap, client.get_id())
+ proc_id = "%s_%s" % (client.ap, client._get_id())
return duration, node, proc_id, client
def make_client_process(self, duration=None, node=None,
proc_id=None, client=None):
- """Returns a client of this server"""
+ """
+ Returns a process of a client application of this server.
+
+ Any parameter left as `None` will be randomly generated
+ according to the parameters assigned to this server and its
+ clients.
+
+ :param duration: the lifetime of the process
+ :type duration: `float`
+ :param node: the node on which the process should be run
+ :type node: `rumba.model.Node`
+ :param proc_id: the ID identifying the returned process
+ :type proc_id: `str`
+ :param client: the client of which the returned process
+ should be an instance
+ :type client: `.Client`
+ :return: the process
+ :rtype: `.ClientProcess`
+ """
(d, n, p, c) = self._make_process_arguments(duration, node,
proc_id, client)
return c.process(
@@ -282,6 +392,7 @@ class Server(SBEntity):
)
def run(self):
+ """Starts this server"""
for node in self.nodes:
run_cmd = self.ap + (
(" " + self.options) if self.options is not None else ""
@@ -298,6 +409,7 @@ class Server(SBEntity):
self.id, node.name)
def stop(self):
+ """Stops this server"""
for node, pid in self.pids.items():
logger.debug(
'Killing server %s on node %s.',
@@ -317,7 +429,8 @@ class Server(SBEntity):
# @servers: App servers available in the network.
# Type == Server or Type == List[Tuple[Server, Node]]
#
-class StoryBoard(SBEntity):
+class StoryBoard(_SBEntity):
+ """Class representing the storyboard of an experiment"""
SCRIPT_RESOLUTION = 0.1
EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float)
@@ -325,7 +438,19 @@ class StoryBoard(SBEntity):
def get_e_id(self):
return 'storyboard'
- def __init__(self, duration, experiment=None, servers=None, script=None):
+ def __init__(self, duration, experiment=None, servers=None):
+ """
+
+ :param duration: the required duration of the storyboard (s)
+ :type duration: `float`
+ :param experiment: the experiment data this storyboard should use
+ :type experiment: `rumba.model.Experiment`
+ :param servers: the list of servers this storyboard will use to generate
+ random events through the
+ :meth:`.StoryBoard.generate_script` method
+ :type servers: list of `.Server` or (`.Server`, `rumba.model.Node`)
+ tuples
+ """
self.id = 'storyboard'
self.experiment = experiment
self.duration = duration
@@ -347,7 +472,7 @@ class StoryBoard(SBEntity):
self._build_nodes_lists()
# The following must be last, because it needs the info from
# _build_nodes_list
- self._script = script if script is not None else _Script(self)
+ self._script = _Script(self)
def _build_nodes_lists(self):
"""Populates server_nodes and client_nodes lists"""
@@ -386,7 +511,8 @@ class StoryBoard(SBEntity):
"""
Set the storyboard's underlying experiment instance
- @param experiment: the experiment instance
+ :param experiment: the experiment instance
+ :type experiment: `rumba.model.Experiment`
"""
if not isinstance(experiment, model.Experiment):
raise TypeError('Experiment instance required.')
@@ -394,7 +520,13 @@ class StoryBoard(SBEntity):
self._build_nodes_lists()
def add_server(self, server):
- """Register a server node to the sb."""
+ """
+ Register a server application to the sb for
+ random event generation.
+
+ :param server: the server application
+ :type server: `.Server`
+ """
if self.experiment is None:
raise ValueError("Cannot add a server before "
"setting the experiment.")
@@ -404,11 +536,13 @@ class StoryBoard(SBEntity):
def add_server_on_node(self, server, node):
"""
- Utility method to simultaneously add a server to a sb
+ Simultaneously add a server to this storyboard
and a node to the server.
- @param server: the server to be added to the storyboard
- @param node: the node upon which the server should run
+ :param server: the server to be added to the storyboard
+ :type server: `.Server`
+ :param node: the node upon which the server should run
+ :type node: `rumba.model.Node`
"""
if self.experiment is None:
raise ValueError("Cannot add a server before "
@@ -420,6 +554,12 @@ class StoryBoard(SBEntity):
self._validate_and_add_server(server, node)
def del_server(self, server):
+ """
+ Deregister a server application from this storyboard.
+
+ :param server: the server to remove
+ :type server: `.Server`
+ """
del self.server_apps[server.id]
self._build_nodes_lists()
@@ -431,21 +571,25 @@ class StoryBoard(SBEntity):
trigger=None,
ev_id=None,):
"""
- Calls a function with the specified triggers and arguments.
+ Calls a Python function with the specified arguments as soon as
+ the specified triggers are satisfied.
+
:param call: the function to run
- :type call: function (methods included)
+ :type call: `callable`
+ :param args: arguments to pass to the function
+ :type args: `list`
+ :param kwargs: keyword arguments to be passed
+ :type kwargs: `dict`
:param c_time: the function will not be called before `c_time`
seconds have passed
- :type c_time: :py:class:`float`
+ :type c_time: `float`
:param trigger: the function must not be called before the event
`trigger` has completed
- :type trigger: :py:class:`.Event` or :py:class:`str`
+ :type trigger: `.Event` or `str`
:param ev_id: the ID to assign to the generated event
- :type ev_id: :py:class:`str`
- :param args: arguments to pass to the function
- :param kwargs: keyword arguments to be passed
+ :type ev_id: `str`
:return: the event representing the calling of the function
- :rtype: :py:class:`.Event`
+ :rtype: `.Event`
"""
if args is None:
args = []
@@ -458,12 +602,19 @@ class StoryBoard(SBEntity):
def schedule_command(self, t, node, command):
"""
- Schedules the given command to be run at t seconds from the start.
- The commands are run in no particular order, so take care
-
- @param t: (float) seconds to wait before running the command
- @param node: (Node or str) the node on which the command should be run
- @param command: (str or list[str]) the command(s) to be run,
+ Schedules the given command to be run no sooner than t seconds
+ from the start of the storyboard execution.
+
+ Commands triggering at times very close to each other might be
+ run in any order. Use the :meth:`.StoryBoard.schedule_action` method
+ to force a command to run after another event.
+
+ :param t: seconds to wait before running the command
+ :type t: `float`
+ :param node: the node on which the command should be run
+ :type node: `rumba.model.Node` or `str`
+ :param command: the command(s) to be run
+ :type command: `str` or `list` of `str`
"""
if self.experiment is None:
raise ValueError("An experiment is needed to schedule commands.")
@@ -483,8 +634,10 @@ class StoryBoard(SBEntity):
"""
Runs a command (or several) on a given node, immediately.
- @param node: (Node or str) the node on which the command should be run
- @param command: (str or list[str]) the command(s) to be run
+ :param node: the node on which the command should be run
+ :type node: `rumba.model.Node` or `str`
+ :param command: the command(s) to be run
+ :type command: `str` or `list` of `str`
"""
if self.experiment is None:
raise ValueError("Experiment needed to run commands.")
@@ -499,11 +652,13 @@ class StoryBoard(SBEntity):
def add_event(self, event):
"""
- Add an event to this script, provided either as an Event
- instance or as a string as read from a .rsb script.
+ Adds an event to this script.
+
+ The event acan be passed either as an `.Event`
+ instance or as a string as read as from a .rsb script.
:param event: the event to add
- :type event: (Event or str)
+ :type event: `.Event` or `str`
"""
self._script.add_event(event)
@@ -512,7 +667,7 @@ class StoryBoard(SBEntity):
Remove an event from this storyboard
:param event: the event (or id thereof) to remove
- :type event: (Event or str)
+ :type event: `.Event` or `str`
"""
self._script.del_event(event)
@@ -528,19 +683,25 @@ class StoryBoard(SBEntity):
Except for the server, if a parameter is not specified,
it will be randomly generated according to the server
- parameters (mean duration, client apps and their nodes)
-
- @param server: the server of which one client should be run
- @param duration: the duration of the client process
- @param node: (Node or str) the node on which the client should be run
- @param proc_id: the entity ID to use for the process
- @param callback: callable or list thereof to be run
+ parameters (mean duration, registered client apps
+ and their nodes)
+
+ :param server: the server of which one client should be run
+ :type server: `.Server`
+ :param duration: the duration of the client process
+ :type duration: `float`
+ :param node: the node on which the client should be run
+ :type node: `rumba.model.Node` or `str`
+ :param proc_id: the ID identifying the generated process
+ :type proc_id: `str`
+ :param callback: callable or list thereof to be run
after client termination
+ :type callback: `callable` or `list` of `callable`
"""
if isinstance(server, str):
server = self.server_apps[server]
if duration is None:
- duration = server.get_duration()
+ duration = server._get_duration()
client = random.choice(server.clients)
self.run_client(client, duration, node, proc_id, callback)
@@ -556,12 +717,17 @@ class StoryBoard(SBEntity):
If the node parameter is not given, it will be chosen at random
among the client default nodes.
- @param client: the client which should be run
- @param duration: the duration of the client process
- @param node: (Node or str) the node on which the client should be run
- @param proc_id: the entity ID to use for the process
- @param callback: callable or list thereof to be run
+ :param client: the client which should be run
+ :type client: `.Client`
+ :param duration: the duration of the client process
+ :type duration: `float`
+ :param node: the node on which the client should be run
+ :type node: `rumba.model.Node` or `str`
+ :param proc_id: the entity ID to use for the process
+ :type proc_id: `str`
+ :param callback: callable or list thereof to be run
after client termination
+ :type callback: `callable` or `list` of `callable`
"""
if isinstance(client, str):
client = self.client_apps[client]
@@ -596,14 +762,18 @@ class StoryBoard(SBEntity):
to the server parameters (client apps and their nodes).
Note that this method, as opposed to
- :meth:`rumba.storyboard.run_client_of`, will not generate an event
- to stop the client after the duration is expired. In most cases,
- :meth:`rumba.storyboard.run_client_of` is the way to go.
-
- @param server: the server of which one client should be run
- @param duration: the duration of the client process
- @param node: the node on which the client should be run
- @param proc_id: the entity ID to use for the process
+ :meth:`.StoryBoard.run_client_of`, will not automatically generate
+ an event stopping the client after the duration is expired.
+ In most cases, :meth:`.StoryBoard.run_client_of` is the way to go.
+
+ :param server: the server of which one client should be run
+ :type server: `.Server`
+ :param duration: the duration of the client process
+ :type duration: `float`
+ :param node: the node on which the client should be run
+ :type node: `rumba.model.Node` or `str`
+ :param proc_id: the ID identifying the generated process
+ :type proc_id: `str`
"""
if isinstance(server, str):
server = self.server_apps[server]
@@ -618,14 +788,18 @@ class StoryBoard(SBEntity):
among the client default nodes.
Note that this method, as opposed to
- :meth:`rumba.storyboard.run_client`, will not generate an event
- to stop the client after the duration is expired. In most cases,
- :meth:`rumba.storyboard.run_client` is the way to go.
-
- @param client: the client which should be run
- @param duration: the duration of the client process
- @param node: the node on which the client should be run
- @param proc_id: the entity ID to use for the process
+ :meth:`.StoryBoard.run_client`, will not automatically generate
+ an event stopping the client after the duration is expired.
+ In most cases, :meth:`.StoryBoard.run_client` is the way to go.
+
+ :param client: the client which should be run
+ :type client: `.Client`
+ :param duration: the duration of the client process
+ :type duration: `float`
+ :param node: the node on which the client should be run
+ :type node: `rumba.model.Node` or `str`
+ :param proc_id: the entity ID to use for the process
+ :type proc_id: `str`
"""
if isinstance(client, str):
client = self.client_apps[client]
@@ -636,23 +810,31 @@ class StoryBoard(SBEntity):
process.run()
def kill_process(self, proc_id):
+ """
+ Stops the `.ClientProcess` with the specified ID.
+
+ :param proc_id: the ID of the process to kill
+ :type proc_id: `str`
+ """
process = self.process_dict.get(proc_id, None)
if process is None:
raise ValueError('No process named %s' % (proc_id,))
process.stop()
del self.process_dict[proc_id]
- def periodic_check(self, t):
+ def _periodic_check(self, t):
self._script.check_for_ready_ev(t)
self._script.run_ready()
def generate_script(self, clean=True):
"""
Randomly generate a script for this experiment based on the
- parameters provided to the instances of servers, nodes and clients.
+ `.Server` instances registered to this storyboard,
+ their nodes, thier clients and the nodes of their clients.
- @param clean: if True, discard the current script before
+ :param clean: if `True`, discard the current script before
generating a new one.
+ :type clean: `bool`
"""
if self.experiment is None:
raise ValueError('Cannot generate script without an experiment')
@@ -666,7 +848,7 @@ class StoryBoard(SBEntity):
last_marker += 1
logger.debug('Passed the %s seconds mark', last_marker*marker)
for server in self.server_apps.values():
- c_l = server.get_new_clients(self.SCRIPT_RESOLUTION)
+ c_l = server._get_new_clients(self.SCRIPT_RESOLUTION)
for d, n, p, c in c_l:
if d > self.duration - t: # would outlast the experiment
continue
@@ -692,6 +874,7 @@ class StoryBoard(SBEntity):
return start_event
def start(self):
+ """Start the storyboard execution."""
if self.experiment is None:
raise ValueError("Cannot run sb with no experiment.")
if self._script is None:
@@ -717,13 +900,13 @@ class StoryBoard(SBEntity):
server.run()
res = self.SCRIPT_RESOLUTION # for brevity
while self.cur_time < self.duration:
- self.periodic_check(self.cur_time)
+ self._periodic_check(self.cur_time)
next_breakpoint = math.ceil(self.cur_time / res) * res
delta = next_breakpoint - self.cur_time
if delta > 0: # just in case
time.sleep(delta)
self.cur_time = float(time.time() - self.start_time)
- self.periodic_check(self.cur_time)
+ self._periodic_check(self.cur_time)
# Do things that were scheduled
# in the last seconds
# of the StoryBoard
@@ -734,6 +917,15 @@ class StoryBoard(SBEntity):
server.stop()
def fetch_logs(self, local_dir=None):
+ """
+ Fetch all server application and client application logs from
+ the different nodes, and put them into `local_dir`
+
+ :param local_dir: the local directory in which the logs should
+ be stored. If `None`, `/tmp/rumba/<exp_name>`
+ will be used
+ :type local_dir: `str`
+ """
if local_dir is None:
local_dir = self.experiment.log_dir
if not os.path.isdir(local_dir):
@@ -750,11 +942,25 @@ class StoryBoard(SBEntity):
logger.debug('Log list is:\n%s', logs_list)
node.fetch_files(logs_list, dst_dir)
- def set_link_state(self, t, dif, state):
+ def schedule_link_state(self, t, dif, state):
+ """
+ Schedules a link's (`rumba.model.ShimEthDIF`) state to go
+ up or down at the specified time.
+
+ :param t: the time in the storyboard at which the state
+ change should happen
+ :type t: `float`
+ :param dif: the DIF which should be reconfigured
+ :type dif: `rumba.model.ShimEthDIF`
+ :param state: the desired state
+ :type state: `str` -- either `up` or `down`
+ """
if self.experiment is None:
raise ValueError("An experiment is needed to schedule commands.")
if not isinstance(dif, model.ShimEthDIF):
- raise Exception("Not a Shim Ethernet DIF.")
+ raise ValueError("Not a Shim Ethernet DIF.")
+ if state not in ['up', 'down']:
+ raise ValueError('Only possible states are "up" and "down"')
if self._script is None:
self._script = _Script(self)
@@ -763,13 +969,46 @@ class StoryBoard(SBEntity):
action = functools.partial(node.set_link_state, dif, state)
self._script.add_event(Event(action, ev_time=t))
- def set_link_up(self, t, dif):
- self.set_link_state(t, dif, 'up')
+ def schedule_link_up(self, t, dif):
+ """
+ Schedules a link's (`rumba.model.ShimEthDIF`) state to go
+ up at the specified time.
+
+ :param t: the time in the storyboard at which the state
+ change should happen
+ :type t: `float`
+ :param dif: the DIF which should be reconfigured
+ :type dif: `rumba.model.ShimEthDIF`
+ """
+ self.schedule_link_state(t, dif, 'up')
+
+ def schedule_link_down(self, t, dif):
+ """
+ Schedules a link's (`rumba.model.ShimEthDIF`) state to go
+ down at the specified time.
+
+ :param t: the time in the storyboard at which the state
+ change should happen
+ :type t: `float`
+ :param dif: the DIF which should be reconfigured
+ :type dif: `rumba.model.ShimEthDIF`
+ """
+ self.schedule_link_state(t, dif, 'down')
+
+ def schedule_node_state(self, t, node, state):
+ """
+ Schedules a node's state to go up or down at the specified time.
- def set_link_down(self, t, dif):
- self.set_link_state(t, dif, 'down')
+ When a node is down all of its links are set to `down`.
- def set_node_state(self, t, node, state):
+ :param t: the time in the storyboard at which the state
+ change should happen
+ :type t: `float`
+ :param node: the node which should be reconfigured
+ :type node: `rumba.model.Node`
+ :param state: the desired state
+ :type state: `str` -- either `up` or `down`
+ """
if self.experiment is None:
raise ValueError("An experiment is needed to schedule commands.")
@@ -779,29 +1018,54 @@ class StoryBoard(SBEntity):
for dif in node.difs:
if not isinstance(dif, model.ShimEthDIF):
continue
- action = functools.partial(node.set_link_state, dif, state)
+ action = functools.partial(node.schedule_link_state, dif, state)
self._script.add_event(Event(action, ev_time=t))
- def set_node_up(self, t, node):
- self.set_node_state(t, node, 'up')
+ def schedule_node_up(self, t, node):
+ """
+ Schedules a node's state to go up at the specified time.
- def set_node_down(self, t, node):
- self.set_node_state(t, node, 'down')
+ :param t: the time in the storyboard at which the state
+ change should happen
+ :type t: `float`
+ :param node: the node which should be reconfigured
+ :type node: `rumba.model.Node`
+ """
+ self.schedule_node_state(t, node, 'up')
+
+ def schedule_node_down(self, t, node):
+ """
+ Schedules a node's state to go down at the specified time.
+
+ When a node is down all of its links are set to `down`.
+
+ :param t: the time in the storyboard at which the state
+ change should happen
+ :type t: `float`
+ :param node: the node which should be reconfigured
+ :type node: `rumba.model.Node`
+ """
+ self.schedule_node_state(t, node, 'down')
def write_script(self, buffer):
"""
- Writes the script on a string buffer, at the current position
+ Writes the script on a (string-oriented) buffer,
+ at the current position
- @param buffer: a string buffer.
+ :param buffer: a string buffer.
+ :type buffer: string-oriented `file-like` object
"""
self._script.write(buffer)
def write_script_to_file(self, filename, clean=True):
"""
- Writes the script to a file, overwriting content.
+ Writes the script to a file.
- @param filename: the name of the destination file
- @param clean: if True, current file contents will be overwritten.
+ :param filename: the name of the destination file
+ :type filename: `str`
+ :param clean: if True, current file's contents will be overwritten.
+ If False, the script will be appended to the file.
+ :type clean: `bool`
"""
mode = 'w'
if not clean:
@@ -811,9 +1075,10 @@ class StoryBoard(SBEntity):
def write_script_string(self):
"""
- Writes the script into a string and returns it.
+ Writes the script as a string and returns it.
- @return: the script as a string.
+ :return: the script as a string.
+ :rtype: `str`
"""
s = StringIO()
self.write_script(s)
@@ -823,8 +1088,10 @@ class StoryBoard(SBEntity):
"""
Reads a script from a buffer, at the current position.
- @param buffer: the buffer to read from.
- @param clean: if True, discard the current script before reading.
+ :param buffer: the buffer to read from.
+ :type buffer: string-oriented `file-like` object
+ :param clean: if True, discard the current script before reading.
+ :type clean: `bool`
"""
if clean:
self._script = _Script(self)
@@ -834,8 +1101,10 @@ class StoryBoard(SBEntity):
"""
Reads a script from a file.
- @param filename: the file to read from.
- @param clean: if True, discard the current script before reading.
+ :param filename: the file to read from.
+ :type filename: `str`
+ :param clean: if True, discard the current script before reading.
+ :type clean: `bool`
"""
if clean:
self._script = _Script(self)
@@ -844,10 +1113,12 @@ class StoryBoard(SBEntity):
def parse_script_string(self, string, clean=True):
"""
- Reads a script from a string.
+ Reads a string as a script.
- @param string: the string to read from.
- @param clean: if True, discard the current script before reading.
+ :param string: the string to read from.
+ :type string: `str`
+ :param clean: if True, discard the current script before reading.
+ :type clean: `bool`
"""
if clean:
self._script = _Script(self)
@@ -856,13 +1127,17 @@ class StoryBoard(SBEntity):
def capture_traffic(self, start, end, node, dif):
"""
- Captures the traffic of an interface on a certain node.
-
- :param start: The time to start capturing.
- :param end: The time to stop capturing.
- :param node: The node to capture on.
- :param dif: The Shim Ethernet DIF of the node, Rumba
- automatically resolves the correct interface.
+ Captures the traffic of an interface on a node.
+
+ :param start: the time to start capturing.
+ :type start: `float`
+ :param end: the time to stop capturing.
+ :type end: `float`
+ :param node: the node to capture on.
+ :type node: `rumba.model.Node`
+ :param dif: the node's Shim Ethernet DIF whose interface
+ will be used for the capture.
+ :type dif: `rumba.model.ShimEthDIF`
"""
for ipcp in dif.ipcps:
if ipcp.node is not node:
@@ -894,6 +1169,7 @@ class StoryBoard(SBEntity):
class Event(object):
+ """Class representing an event in a `.StoryBoard`"""
cur_id = -1
@@ -904,11 +1180,15 @@ class Event(object):
def __init__(self, action, ev_id=None, ev_time=None, trigger=None):
"""
- @param ev_id: (str) id of the event
- @param action: (any SBEntity method) action to undertake
- when event is activated
- @param ev_time: (float) seconds to wait before running the event
- @param trigger: (Event) Event which must complete before this runs
+ :param ev_id: ID of the event
+ :type ev_id: `str`
+ :param action: action to undertake when event is activated
+ :type action: nullary `callable`
+ :param ev_time: seconds to wait before running the event
+ :type ev_time: `float`
+ :param trigger: Event which must complete before
+ this event runs
+ :type trigger: `.Event`
"""
self.id = ev_id if ev_id is not None else self.generate_id()
if ev_time is None and trigger is None:
@@ -957,6 +1237,11 @@ class Event(object):
@property
def failed(self):
+ """
+
+ :return: True if this event's execution failed
+ :rtype: `bool`
+ """
return self.exception is not None
def pre_exec(self): # hook to be overridden
@@ -971,9 +1256,7 @@ class Event(object):
raise ValueError('Event %s has already ran' % self.id)
def run(self):
- """
- Run this event's action
- """
+ """Run this event's action"""
self._start()
try:
self.action()
@@ -983,9 +1266,12 @@ class Event(object):
def check(self, cur_time):
"""
- Check if this event can be run.
- @param cur_time: (float) current time
- @return: True if the preconditions are satisfied, False otherwise.
+ Check if this event can be run, i.e. it's prerequisites are satisfied.
+
+ :param cur_time: current elapsed time from storyboard's start.
+ :type cur_time: `float`
+ :return: True if the preconditions are satisfied, False otherwise.
+ :rtype: `bool`
"""
return \
(self.time is None or cur_time > self.time) \
diff --git a/rumba/utils.py b/rumba/utils.py
index f6d6c1c..fd12333 100644
--- a/rumba/utils.py
+++ b/rumba/utils.py
@@ -88,19 +88,27 @@ class ExperimentManager(object):
syslogs_strategy=NO_SYSLOGS,
syslogs=None):
"""
- Initializes the ExperimentManager.
-
:param experiment: The experiment name.
+ :type experiment: `rumba.model.Experiment`
:param swap_out_strategy: What action to perform on swap-out.
- :param syslog_strategy: What system and prototype logs to retrieve
+ :param syslogs_strategy: What system and prototype logs to retrieve
before swap-out.
:param syslogs: The location of the syslogs in case of custom syslogs.
+ :type syslogs: `str`
+
+ .. note:: Options for swap_out_strategy are
+
+ - NO_SWAPOUT == 0,
+ - AUTO_SWAPOUT == 1,
+ - PAUSE_SWAPOUT == 2,
+ - PROMPT_SWAPOUT == 3.
- .. note:: Options for swap_out_strategy are NO_SWAPOUT, AUTO_SWAPOUT,
- PAUSE_SWAPOUT, PROMPT_SWAPOUT.
+ .. note:: Options for syslog_strategy are
- .. note:: Options for syslog_strategy are NO_SYSLOGS, DEFAULT_SYSLOGS,
- DMESG_SYSLOGS, CUSTOM_SYSLOGS.
+ - NO_SYSLOGS == 0,
+ - DEFAULT_SYSLOGS == 1,
+ - DMESG_SYSLOGS == 2,
+ - CUSTOM_SYSLOGS == 3.
"""
assert isinstance(experiment, model.Experiment), \
'An experiment instance is required.'