diff options
-rw-r--r-- | doc/log.rst | 2 | ||||
-rw-r--r-- | doc/utils.rst | 2 | ||||
-rw-r--r-- | doc/workflow.rst | 6 | ||||
-rw-r--r-- | rumba/log.py | 70 | ||||
-rw-r--r-- | rumba/model.py | 48 | ||||
-rw-r--r-- | rumba/storyboard.py | 602 | ||||
-rw-r--r-- | rumba/utils.py | 22 |
7 files changed, 550 insertions, 202 deletions
diff --git a/doc/log.rst b/doc/log.rst index 1fd3b6f..d19bc13 100644 --- a/doc/log.rst +++ b/doc/log.rst @@ -8,3 +8,5 @@ Logging .. autofunction:: rumba.log.flush_and_kill_logging .. autofunction:: rumba.log.flush_log + +.. autoclass:: rumba.log.LogOptions diff --git a/doc/utils.rst b/doc/utils.rst index 6df8a0c..cc3db41 100644 --- a/doc/utils.rst +++ b/doc/utils.rst @@ -8,6 +8,8 @@ Example usage of the class: .. code-block:: python + from rumba.utils import ExperimentManager, PROMPT_SWAPOUT + with ExperimentManager(exp, swap_out_strategy=PROMPT_SWAPOUT): exp.swap_in() exp.bootstrap_prototype() diff --git a/doc/workflow.rst b/doc/workflow.rst index 04f23b1..672c33c 100644 --- a/doc/workflow.rst +++ b/doc/workflow.rst @@ -34,11 +34,9 @@ Workflow Accessing nodes after swap-in ----------------------------- -To access a node once the experiment swapped in, use the following +To access a node once the experiment is swapped in, use the following command (in the same terminal where ssh-agent was run in case of jFed): :: $ rumba-access $NODE_NAME -Where $NODE_NAME is the name of the node to access. In case of the -QEMU testbed, the password of the downloaded buildroot images is -'root'. +Where $NODE_NAME is the name of the node to access.
\ No newline at end of file diff --git a/rumba/log.py b/rumba/log.py index 67070a6..2ee871c 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -170,7 +170,7 @@ except ImportError: class RumbaFormatter(logging.Formatter): """ - The logging.Formatter subclass used by Rumba + The `logging.Formatter` subclass used by Rumba """ level_name_table = { @@ -178,7 +178,14 @@ class RumbaFormatter(logging.Formatter): 'ERROR': 'ERR', 'WARNING': 'WRN', 'INFO': 'INF', - 'DEBUG': 'DBG' + 'DEBUG': 'DBG', + # handlers beyond the first will get the + # modified levelname + 'CRT': 'CRT', + 'ERR': 'ERR', + 'WRN': 'WRN', + 'INF': 'INF', + 'DBG': 'DBG' } def __init__(self): @@ -202,15 +209,6 @@ def setup(): logging.getLogger('').setLevel(logging.ERROR) logging.getLogger('rumba').setLevel(logging.INFO) - handler = logging.StreamHandler(sys.stdout) - handler.setLevel(logging.DEBUG) - formatter = RumbaFormatter() - handler.setFormatter(formatter) - listener = QueueListener(mq, handler) - global logging_listener - logging_listener = listener - listener.start() - # Used for the first call, in order to configure logging def _get_logger_with_setup(name): @@ -231,11 +229,14 @@ _get_logger = _get_logger_with_setup def get_logger(name): """ - Returns the logger named <name>. - <name> should be the module name, for consistency. If setup has not been - called yet, it will call it first. + Returns the logger named `name`. + + `name` should be the module name, for consistency. + + If setup has not been called yet, it will call it first. :param name: the name of the desired logger + :type name: `str` :return: The logger """ return _get_logger(name) @@ -243,11 +244,13 @@ def get_logger(name): def set_logging_level(level, name=None): """ - Set the current logging level to <level> for logger named <name>. - If name is not specified, sets the logging level for all rumba loggers. + Set the current logging level to `level` for the logger named `name`. + If `name` is not specified, sets the logging level for all rumba loggers. - :param level: the desired logging level. + :param level: the desired logging level, in string or int form. + :type level: `str` or `int` :param name: The name of the logger to configure + :type name: `str` .. note:: Accepted levels are: @@ -274,8 +277,8 @@ def set_logging_level(level, name=None): def reset_logging_level(): """ - Resets the current logging levels to the defaults. For Rumba the - default is INFO. + Resets the current logging level of all loggers to the default. + For the Rumba library the default is INFO. """ # Un-sets every logger previously set for logger in loggers_set: @@ -303,6 +306,15 @@ def flush_and_kill_logging(): class LogOptions(object): + """Class holding the logging configuration""" + + def __init__(self): + global logging_listener + global mq + + logging_listener = QueueListener(mq) + self.log_to_console() + logging_listener.start() @staticmethod def _get_handlers(): @@ -316,19 +328,39 @@ class LogOptions(object): def _add_handler(self, handler): handler.setFormatter(RumbaFormatter()) + handler.setLevel(DEBUG) handlers = self._get_handlers() + (handler,) self._set_handlers(*handlers) return self def log_to_file(self, path='rumba.log'): + """ + Set the logging framework to log to file on top of the other + logging facilities. + + :param path: logging file filename + :type path: `str` + :return: this `.LogOptions` instance + """ new_handler = logging.handlers.RotatingFileHandler(path) return self._add_handler(new_handler) def reset_logging(self): + """ + Disable all logging facilities + + :return: this `.LogOptions` instance + """ self._set_handlers(*tuple()) return self def log_to_console(self): + """ + Set the logging framework to log to stdout on top of the + other configured logging facilities + + :return: this `.LogOptions` instance + """ new_handler = logging.StreamHandler(sys.stdout) return self._add_handler(new_handler) diff --git a/rumba/model.py b/rumba/model.py index f9a0d3c..a6925b3 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -59,7 +59,6 @@ except OSError: pass - class Testbed(object): """ Base class for every testbed plugin. @@ -124,6 +123,7 @@ class Testbed(object): def _swap_out(self, experiment): logger.info("swap_out(): nothing to do") + class DIF(object): """ Base class for DIFs. @@ -231,7 +231,7 @@ class ShimEthDIF(DIF): distribution=None): """ Set the delay parameters of the underlying link. - Parameters as in :py:meth:`.Delay.__init__` + Parameters as in :py:class:`.Delay` :param delay: average delay in ms :type delay: :py:class:`int` @@ -252,7 +252,7 @@ class ShimEthDIF(DIF): correlation=None): """ Set the loss parameter of the underlying link. - Parameters as in :py:meth:`.Loss.__init__` + Parameters as in :py:class:`.Loss` :param loss: loss in percentage :type loss: :py:class:`int` or :py:class:`float` @@ -321,11 +321,11 @@ class NormalDIF(DIF): :param comp: Component name. :param pol: Policy name """ - self.policy.del_policy(comp, policy_name) + self.policy.del_policy(comp, pol) def show(self): """ - :return: A string of the policies in the DIF. + :return: A string representing the policies in the DIF. """ s = DIF.__repr__(self) for comp, pol_dict in self.policy.get_policies().items(): @@ -338,6 +338,14 @@ class NormalDIF(DIF): class Distribution(Enum): """ An enum holding different statistical distributions. + + **Values:** + + `NORMAL = 1` + + `PARETO = 2` + + `PARETONORMAL = 3` """ NORMAL = 1 PARETO = 2 @@ -463,9 +471,9 @@ class LinkQuality(object): Clone old_quality, updating it with the provided parameters if present. - :param old_quality: A :py:class`.LinkQuality` instance to + :param old_quality: A :py:class:`.LinkQuality` instance to use as a base - :type old_quality: :py:class`.LinkQuality` + :type old_quality: :py:class:`.LinkQuality` :param delay: Delay object holding delay configuration or number corresponding to delay in ms :type delay: :py:class:`.Delay` or :py:class:`int` @@ -474,8 +482,8 @@ class LinkQuality(object): :type loss: :py:class:`.Loss` or :py:class:`float` :param rate: The rate of the link in mbit :type rate: :py:class:`int` - :return: a new :py:class`.LinkQuality` instance. - :rtype: py:class`LinkQuality` + :return: a new :py:class:`.LinkQuality` instance. + :rtype: :py:class:`.LinkQuality` """ if delay is None: delay = old_quality.delay @@ -568,6 +576,7 @@ class LinkQuality(object): "netem" % ipcp.ifname, as_root=True) LinkQuality._active.remove(ipcp) + class SSHConfig(object): def __init__(self, hostname, port=22, proxy_server=None): self.username = None @@ -588,6 +597,7 @@ class SSHConfig(object): def set_http_proxy(self, proxy): self.http_proxy = proxy + class Node(object): """ A node in the experiment. @@ -624,8 +634,8 @@ class Node(object): if hasattr(dif, 'policy'): # check if the dif supports policies self.policies[dif] = policies.get(dif, Policy(dif, self)) - self.executor = None # will be set by testbed on swap_in - self.startup_command = None # will be set by prototype + self.executor = None # will be set by testbed on swap_in + self.startup_command = None # will be set by prototype self._validate() @@ -742,6 +752,7 @@ class Node(object): """ Removes a policy. + :param dif: the dif to which the policy should be applied :param component_name: Name of the component. :param policy_name: Name of the policy. """ @@ -764,7 +775,10 @@ class Node(object): :param time_out: Seconds before timing out. :param use_proxy: Use a proxy to execute the commands? """ - return self.executor.execute_commands(self, commands, as_root, time_out) + return self.executor.execute_commands(self, + commands, + as_root, + time_out) def execute_command(self, command, as_root=False, time_out=3, use_proxy=False): @@ -777,7 +791,10 @@ class Node(object): :param use_proxy: Use a proxy to execute the commands? :return: The stdout of the command. """ - return self.executor.execute_command(self, command, as_root, time_out) + return self.executor.execute_command(self, + command, + as_root, + time_out) def copy_file(self, path, destination): """ @@ -801,7 +818,7 @@ class Node(object): """ Fetch file from the node. - :param paths: Location of the files on the node. + :param path: Location of the files on the node. :param destination: Destination location of the files. :param sudo: The file is owned by root on the node? """ @@ -828,6 +845,7 @@ class Node(object): self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state, as_root=True) + class IPCP(object): def __init__(self, name, node, dif): self.name = name @@ -868,6 +886,7 @@ class ShimUDPIPCP(IPCP): IPCP.__init__(self, name, node, dif) # TODO: add IP and port + class Policy(object): def __init__(self, dif, node=None, policies=None): self.dif = dif # type: NormalDIF @@ -940,6 +959,7 @@ class Policy(object): s += "\n]\n" return s + class Experiment(object): """ Base class for experiments. diff --git a/rumba/storyboard.py b/rumba/storyboard.py index 2de21ff..de2bc62 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -62,7 +62,7 @@ except ImportError: # But we might not care -class SBEntity(object): +class _SBEntity(object): def __init__(self, e_id): self.id = e_id @@ -71,17 +71,37 @@ class SBEntity(object): return type(self).__name__ + '.' + self.id -class Client(SBEntity): +class Client(_SBEntity): + """ + Class representing a client application running in the experiment. + + A list of "client nodes" can be specified: if so, when generating a + random script, this Client will be run only on those nodes. + """ current_id = -1 @classmethod - def get_id(cls): + def _get_id(cls): cls.current_id += 1 return cls.current_id def __init__(self, ap, nodes=None, options=None, shutdown="kill <pid>", c_id=None): + """ + :param ap: the application binary/command to be run + :type ap: `str` + :param nodes: the list of nodes on which the client should be run + :type nodes: `:py:class:`rumba.model.Node` or `list` thereof + :param options: the options to be passed to the binary or command + :type options: `str` + :param shutdown: the command to be run in order to stop the client. + The token "<pid>" will be changed into the process' + pid. + :type shutdown: `str` + :param c_id: the ID used to reference to this instance + :type c_id: `str` + """ self.ap = ap e_id = c_id if c_id is not None else self.ap.replace(' ', '_') super(Client, self).__init__(e_id) @@ -94,13 +114,35 @@ class Client(SBEntity): self.shutdown = shutdown def add_node(self, node): + """ + Add a node to this instance's list. + + :param node: the node to add + :type node: `rumba.model.Node` + """ if not isinstance(node, model.Node): raise Exception("A Node is required.") self.nodes.append(node) def process(self, duration, node=None, proc_id=None): + """ + Generates a `.ClientProcess` of this application. + + :param duration: the duration of the process. It will be + passed to the process call in stead of the + <duration> token. + :type duration: `float` + :param node: the node on which the process should be run. + If `None`, a random node from this client's list + will be picked. + :type node: `rumba.model.Node` + :param proc_id: the ID used to reference to the generated process + :type proc_id: `str` + :return: the generated process + :rtype: `.ClientProcess` + """ if proc_id is None: - proc_id = "%s_%s" % (self.id, self.get_id()) + proc_id = "%s_%s" % (self.id, self._get_id()) if node is None: if len(self.nodes) == 0: raise ValueError('No nodes for client %s' @@ -108,7 +150,7 @@ class Client(SBEntity): node = random.choice(self.nodes) return ClientProcess( proc_id, - self.ap, + self.id, self.startup, duration, node, @@ -116,18 +158,29 @@ class Client(SBEntity): ) -# Base class for client processes -# -# @ap: Application Process binary -# @duration: The time (in seconds) this process should run -# @start_time: The time at which this process is started. -# @options: Options to pass to the binary -# -class ClientProcess(SBEntity): - def __init__(self, proc_id, ap, startup, duration, +class ClientProcess(_SBEntity): + """Class representing a running client application process on a node""" + def __init__(self, proc_id, ap_id, startup, duration, node, shutdown="kill <pid>"): + """ + + :param proc_id: the ID used to identify this instance + :type proc_id: `str` + :param ap_id: the ID of the client app that generated this process + :type ap_id: `str` + :param startup: the full command used to start this process + :type startup: `str` + :param duration: the intended duration of this process. It will also + replace the "<duration>" token in the + `startup` parameter + :type duration: `int` od `float` + :param node: the node on which this process runs + :type node: `rumba.model.Node` + :param shutdown: the command used to stop this process + :type shutdown: `str` + """ super(ClientProcess, self).__init__(proc_id) - self.ap = ap + self.ap_id = ap_id self.startup = startup self.duration = duration if duration is not None else -1 self.start_time = None @@ -137,13 +190,14 @@ class ClientProcess(SBEntity): self.shutdown = shutdown def run(self): + """Starts this process""" if self.node is None: - raise Exception('No node specified for client %s' % (self.ap,)) + raise Exception('No node specified for client %s' % (self.ap_id,)) self.start_time = time.time() logger.debug( 'Starting client app %s on node %s with duration %s.', - self.ap, self.node.name, self.duration + self.ap_id, self.node.name, self.duration ) start_cmd = "./startup.sh %s %s" % ( @@ -155,52 +209,32 @@ class ClientProcess(SBEntity): self.pid = self.node.execute_command(start_cmd) except ssh_support.SSHException: logger.warning('Could not start client %s on node %s.', - self.ap, self.node.name) + self.ap_id, self.node.name) logger.debug('Client app %s on node %s got pid %s.', - self.ap, self.node.name, self.pid) + self.ap_id, self.node.name, self.pid) def stop(self): + """Stops this process""" if self.shutdown != "": logger.debug( 'Killing client %s on node %s.', - self.ap, self.node.name + self.ap_id, self.node.name ) try: kill_cmd = self.shutdown.replace('<pid>', str(self.pid)) self.node.execute_command(kill_cmd) except ssh_support.SSHException: logger.warn('Could not kill client %s on node %s.', - self.ap, self.node.name) + self.ap_id, self.node.name) else: logger.debug( 'Client %s on node %s has terminated.', - self.ap, self.node.name + self.ap_id, self.node.name ) - def check_and_kill(self): - """Check if the process should keep running, stop it if not, - and return true if and only if it is still running.""" - now = time.time() - if not self.running: - return False - if now - self.start_time >= self.duration: - self.stop() - self.running = False - return False - return True - -# Base class for server programs -# -# @ap: Application Process binary -# @arrival_rate: Average requests/s to be received by this server -# @mean_duration: Average duration of a client connection (in seconds) -# @options: Options to pass to the binary -# @max_clients: Maximum number of clients to serve -# @clients: Client binaries that will use this server -# @nodes: Specific nodes to start this server on -# -class Server(SBEntity): +class Server(_SBEntity): + """Class representing a server app running in the experiment""" current_id = -1 @@ -212,6 +246,33 @@ class Server(SBEntity): def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), clients=None, nodes=None, min_duration=2, s_id=None): + """ + + :param ap: the application binary or command which should be run + :type ap: `str` + :param arrival_rate: + :type arrival_rate: `float` + :param mean_duration: the required average lifetime of a client + of this server + :type mean_duration: `float` + :param options: the options to be passed to the binary/command + starting this server + :type options: `str` + :param max_clients: the maximum number of simultaneous clients + which can be served by this application + :type max_clients: `int` + :param clients: the clients applications which will request to + be served by this application + :type clients: `list` of `.Client` + :param nodes: the list of nodes this server application + should be run on + :type nodes: `list` of `rumba.model.Node` + :param min_duration: the minimum lifetime of a client of this + server + :type min_duration: `float` + :param s_id: the ID used to identify this instance + :type s_id: `str` + """ self.ap = ap e_id = s_id if s_id is not None else self.ap.replace(' ', '_') super(Server, self).__init__(e_id) @@ -230,30 +291,61 @@ class Server(SBEntity): self.min_duration = min_duration def add_client(self, client): + """ + Adds a client to this server's list + + :param client: the client to add + :type client: `.Client` + """ self.clients.append(client) def del_client(self, client): + """ + Removes a client from this server's list + + :param client: the client to remove + :type client: `.Client` + """ self.clients.remove(client) def add_node(self, node): + """ + Adds a node to this server's list + + :param node: the node to add + :type node: `rumba.model.Node` + """ self.nodes.append(node) def del_node(self, node): + """ + Removes a node from this server's list + + :param node: the node to remove + :type node: `rumba.model.Node` + """ self.nodes.remove(node) - def get_new_clients(self, interval): + def _get_new_clients(self, interval): """ Returns a list of clients of size appropriate to the server's rate. - The list's size should be a sample from Poisson(arrival_rate) over + The list's size should be a sample ~ Poisson(arrival_rate) over interval seconds. Hence, the average size should be interval * arrival_rate. + + :param interval: the time increment for which new clients should be + generated + :type interval: `float` + + :return: the list of new clients to be started + :rtype: `list` of `(duration, node, proc_id, client)` tuples """ number = _poisson(self.arrival_rate * interval) number = int(min(number, self.max_clients)) return [self._make_process_arguments() for _ in range(number)] - def get_duration(self): + def _get_duration(self): return _exponential(self.actual_parameter) + self.min_duration def _make_process_arguments(self, duration=None, node=None, @@ -261,18 +353,36 @@ class Server(SBEntity): if len(self.clients) == 0: raise Exception("Server %s has empty client list." % (self,)) if duration is None: - duration = self.get_duration() + duration = self._get_duration() if client is None: client = random.choice(self.clients) if node is None: node = random.choice(client.nodes) if proc_id is None: - proc_id = "%s_%s" % (client.ap, client.get_id()) + proc_id = "%s_%s" % (client.ap, client._get_id()) return duration, node, proc_id, client def make_client_process(self, duration=None, node=None, proc_id=None, client=None): - """Returns a client of this server""" + """ + Returns a process of a client application of this server. + + Any parameter left as `None` will be randomly generated + according to the parameters assigned to this server and its + clients. + + :param duration: the lifetime of the process + :type duration: `float` + :param node: the node on which the process should be run + :type node: `rumba.model.Node` + :param proc_id: the ID identifying the returned process + :type proc_id: `str` + :param client: the client of which the returned process + should be an instance + :type client: `.Client` + :return: the process + :rtype: `.ClientProcess` + """ (d, n, p, c) = self._make_process_arguments(duration, node, proc_id, client) return c.process( @@ -282,6 +392,7 @@ class Server(SBEntity): ) def run(self): + """Starts this server""" for node in self.nodes: run_cmd = self.ap + ( (" " + self.options) if self.options is not None else "" @@ -298,6 +409,7 @@ class Server(SBEntity): self.id, node.name) def stop(self): + """Stops this server""" for node, pid in self.pids.items(): logger.debug( 'Killing server %s on node %s.', @@ -317,7 +429,8 @@ class Server(SBEntity): # @servers: App servers available in the network. # Type == Server or Type == List[Tuple[Server, Node]] # -class StoryBoard(SBEntity): +class StoryBoard(_SBEntity): + """Class representing the storyboard of an experiment""" SCRIPT_RESOLUTION = 0.1 EXECUTION_RESOLUTION = 2.5 # in seconds (may be a float) @@ -325,7 +438,19 @@ class StoryBoard(SBEntity): def get_e_id(self): return 'storyboard' - def __init__(self, duration, experiment=None, servers=None, script=None): + def __init__(self, duration, experiment=None, servers=None): + """ + + :param duration: the required duration of the storyboard (s) + :type duration: `float` + :param experiment: the experiment data this storyboard should use + :type experiment: `rumba.model.Experiment` + :param servers: the list of servers this storyboard will use to generate + random events through the + :meth:`.StoryBoard.generate_script` method + :type servers: list of `.Server` or (`.Server`, `rumba.model.Node`) + tuples + """ self.id = 'storyboard' self.experiment = experiment self.duration = duration @@ -347,7 +472,7 @@ class StoryBoard(SBEntity): self._build_nodes_lists() # The following must be last, because it needs the info from # _build_nodes_list - self._script = script if script is not None else _Script(self) + self._script = _Script(self) def _build_nodes_lists(self): """Populates server_nodes and client_nodes lists""" @@ -386,7 +511,8 @@ class StoryBoard(SBEntity): """ Set the storyboard's underlying experiment instance - @param experiment: the experiment instance + :param experiment: the experiment instance + :type experiment: `rumba.model.Experiment` """ if not isinstance(experiment, model.Experiment): raise TypeError('Experiment instance required.') @@ -394,7 +520,13 @@ class StoryBoard(SBEntity): self._build_nodes_lists() def add_server(self, server): - """Register a server node to the sb.""" + """ + Register a server application to the sb for + random event generation. + + :param server: the server application + :type server: `.Server` + """ if self.experiment is None: raise ValueError("Cannot add a server before " "setting the experiment.") @@ -404,11 +536,13 @@ class StoryBoard(SBEntity): def add_server_on_node(self, server, node): """ - Utility method to simultaneously add a server to a sb + Simultaneously add a server to this storyboard and a node to the server. - @param server: the server to be added to the storyboard - @param node: the node upon which the server should run + :param server: the server to be added to the storyboard + :type server: `.Server` + :param node: the node upon which the server should run + :type node: `rumba.model.Node` """ if self.experiment is None: raise ValueError("Cannot add a server before " @@ -420,6 +554,12 @@ class StoryBoard(SBEntity): self._validate_and_add_server(server, node) def del_server(self, server): + """ + Deregister a server application from this storyboard. + + :param server: the server to remove + :type server: `.Server` + """ del self.server_apps[server.id] self._build_nodes_lists() @@ -431,21 +571,25 @@ class StoryBoard(SBEntity): trigger=None, ev_id=None,): """ - Calls a function with the specified triggers and arguments. + Calls a Python function with the specified arguments as soon as + the specified triggers are satisfied. + :param call: the function to run - :type call: function (methods included) + :type call: `callable` + :param args: arguments to pass to the function + :type args: `list` + :param kwargs: keyword arguments to be passed + :type kwargs: `dict` :param c_time: the function will not be called before `c_time` seconds have passed - :type c_time: :py:class:`float` + :type c_time: `float` :param trigger: the function must not be called before the event `trigger` has completed - :type trigger: :py:class:`.Event` or :py:class:`str` + :type trigger: `.Event` or `str` :param ev_id: the ID to assign to the generated event - :type ev_id: :py:class:`str` - :param args: arguments to pass to the function - :param kwargs: keyword arguments to be passed + :type ev_id: `str` :return: the event representing the calling of the function - :rtype: :py:class:`.Event` + :rtype: `.Event` """ if args is None: args = [] @@ -458,12 +602,19 @@ class StoryBoard(SBEntity): def schedule_command(self, t, node, command): """ - Schedules the given command to be run at t seconds from the start. - The commands are run in no particular order, so take care - - @param t: (float) seconds to wait before running the command - @param node: (Node or str) the node on which the command should be run - @param command: (str or list[str]) the command(s) to be run, + Schedules the given command to be run no sooner than t seconds + from the start of the storyboard execution. + + Commands triggering at times very close to each other might be + run in any order. Use the :meth:`.StoryBoard.schedule_action` method + to force a command to run after another event. + + :param t: seconds to wait before running the command + :type t: `float` + :param node: the node on which the command should be run + :type node: `rumba.model.Node` or `str` + :param command: the command(s) to be run + :type command: `str` or `list` of `str` """ if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") @@ -483,8 +634,10 @@ class StoryBoard(SBEntity): """ Runs a command (or several) on a given node, immediately. - @param node: (Node or str) the node on which the command should be run - @param command: (str or list[str]) the command(s) to be run + :param node: the node on which the command should be run + :type node: `rumba.model.Node` or `str` + :param command: the command(s) to be run + :type command: `str` or `list` of `str` """ if self.experiment is None: raise ValueError("Experiment needed to run commands.") @@ -499,11 +652,13 @@ class StoryBoard(SBEntity): def add_event(self, event): """ - Add an event to this script, provided either as an Event - instance or as a string as read from a .rsb script. + Adds an event to this script. + + The event acan be passed either as an `.Event` + instance or as a string as read as from a .rsb script. :param event: the event to add - :type event: (Event or str) + :type event: `.Event` or `str` """ self._script.add_event(event) @@ -512,7 +667,7 @@ class StoryBoard(SBEntity): Remove an event from this storyboard :param event: the event (or id thereof) to remove - :type event: (Event or str) + :type event: `.Event` or `str` """ self._script.del_event(event) @@ -528,19 +683,25 @@ class StoryBoard(SBEntity): Except for the server, if a parameter is not specified, it will be randomly generated according to the server - parameters (mean duration, client apps and their nodes) - - @param server: the server of which one client should be run - @param duration: the duration of the client process - @param node: (Node or str) the node on which the client should be run - @param proc_id: the entity ID to use for the process - @param callback: callable or list thereof to be run + parameters (mean duration, registered client apps + and their nodes) + + :param server: the server of which one client should be run + :type server: `.Server` + :param duration: the duration of the client process + :type duration: `float` + :param node: the node on which the client should be run + :type node: `rumba.model.Node` or `str` + :param proc_id: the ID identifying the generated process + :type proc_id: `str` + :param callback: callable or list thereof to be run after client termination + :type callback: `callable` or `list` of `callable` """ if isinstance(server, str): server = self.server_apps[server] if duration is None: - duration = server.get_duration() + duration = server._get_duration() client = random.choice(server.clients) self.run_client(client, duration, node, proc_id, callback) @@ -556,12 +717,17 @@ class StoryBoard(SBEntity): If the node parameter is not given, it will be chosen at random among the client default nodes. - @param client: the client which should be run - @param duration: the duration of the client process - @param node: (Node or str) the node on which the client should be run - @param proc_id: the entity ID to use for the process - @param callback: callable or list thereof to be run + :param client: the client which should be run + :type client: `.Client` + :param duration: the duration of the client process + :type duration: `float` + :param node: the node on which the client should be run + :type node: `rumba.model.Node` or `str` + :param proc_id: the entity ID to use for the process + :type proc_id: `str` + :param callback: callable or list thereof to be run after client termination + :type callback: `callable` or `list` of `callable` """ if isinstance(client, str): client = self.client_apps[client] @@ -596,14 +762,18 @@ class StoryBoard(SBEntity): to the server parameters (client apps and their nodes). Note that this method, as opposed to - :meth:`rumba.storyboard.run_client_of`, will not generate an event - to stop the client after the duration is expired. In most cases, - :meth:`rumba.storyboard.run_client_of` is the way to go. - - @param server: the server of which one client should be run - @param duration: the duration of the client process - @param node: the node on which the client should be run - @param proc_id: the entity ID to use for the process + :meth:`.StoryBoard.run_client_of`, will not automatically generate + an event stopping the client after the duration is expired. + In most cases, :meth:`.StoryBoard.run_client_of` is the way to go. + + :param server: the server of which one client should be run + :type server: `.Server` + :param duration: the duration of the client process + :type duration: `float` + :param node: the node on which the client should be run + :type node: `rumba.model.Node` or `str` + :param proc_id: the ID identifying the generated process + :type proc_id: `str` """ if isinstance(server, str): server = self.server_apps[server] @@ -618,14 +788,18 @@ class StoryBoard(SBEntity): among the client default nodes. Note that this method, as opposed to - :meth:`rumba.storyboard.run_client`, will not generate an event - to stop the client after the duration is expired. In most cases, - :meth:`rumba.storyboard.run_client` is the way to go. - - @param client: the client which should be run - @param duration: the duration of the client process - @param node: the node on which the client should be run - @param proc_id: the entity ID to use for the process + :meth:`.StoryBoard.run_client`, will not automatically generate + an event stopping the client after the duration is expired. + In most cases, :meth:`.StoryBoard.run_client` is the way to go. + + :param client: the client which should be run + :type client: `.Client` + :param duration: the duration of the client process + :type duration: `float` + :param node: the node on which the client should be run + :type node: `rumba.model.Node` or `str` + :param proc_id: the entity ID to use for the process + :type proc_id: `str` """ if isinstance(client, str): client = self.client_apps[client] @@ -636,23 +810,31 @@ class StoryBoard(SBEntity): process.run() def kill_process(self, proc_id): + """ + Stops the `.ClientProcess` with the specified ID. + + :param proc_id: the ID of the process to kill + :type proc_id: `str` + """ process = self.process_dict.get(proc_id, None) if process is None: raise ValueError('No process named %s' % (proc_id,)) process.stop() del self.process_dict[proc_id] - def periodic_check(self, t): + def _periodic_check(self, t): self._script.check_for_ready_ev(t) self._script.run_ready() def generate_script(self, clean=True): """ Randomly generate a script for this experiment based on the - parameters provided to the instances of servers, nodes and clients. + `.Server` instances registered to this storyboard, + their nodes, thier clients and the nodes of their clients. - @param clean: if True, discard the current script before + :param clean: if `True`, discard the current script before generating a new one. + :type clean: `bool` """ if self.experiment is None: raise ValueError('Cannot generate script without an experiment') @@ -666,7 +848,7 @@ class StoryBoard(SBEntity): last_marker += 1 logger.debug('Passed the %s seconds mark', last_marker*marker) for server in self.server_apps.values(): - c_l = server.get_new_clients(self.SCRIPT_RESOLUTION) + c_l = server._get_new_clients(self.SCRIPT_RESOLUTION) for d, n, p, c in c_l: if d > self.duration - t: # would outlast the experiment continue @@ -692,6 +874,7 @@ class StoryBoard(SBEntity): return start_event def start(self): + """Start the storyboard execution.""" if self.experiment is None: raise ValueError("Cannot run sb with no experiment.") if self._script is None: @@ -717,13 +900,13 @@ class StoryBoard(SBEntity): server.run() res = self.SCRIPT_RESOLUTION # for brevity while self.cur_time < self.duration: - self.periodic_check(self.cur_time) + self._periodic_check(self.cur_time) next_breakpoint = math.ceil(self.cur_time / res) * res delta = next_breakpoint - self.cur_time if delta > 0: # just in case time.sleep(delta) self.cur_time = float(time.time() - self.start_time) - self.periodic_check(self.cur_time) + self._periodic_check(self.cur_time) # Do things that were scheduled # in the last seconds # of the StoryBoard @@ -734,6 +917,15 @@ class StoryBoard(SBEntity): server.stop() def fetch_logs(self, local_dir=None): + """ + Fetch all server application and client application logs from + the different nodes, and put them into `local_dir` + + :param local_dir: the local directory in which the logs should + be stored. If `None`, `/tmp/rumba/<exp_name>` + will be used + :type local_dir: `str` + """ if local_dir is None: local_dir = self.experiment.log_dir if not os.path.isdir(local_dir): @@ -750,11 +942,25 @@ class StoryBoard(SBEntity): logger.debug('Log list is:\n%s', logs_list) node.fetch_files(logs_list, dst_dir) - def set_link_state(self, t, dif, state): + def schedule_link_state(self, t, dif, state): + """ + Schedules a link's (`rumba.model.ShimEthDIF`) state to go + up or down at the specified time. + + :param t: the time in the storyboard at which the state + change should happen + :type t: `float` + :param dif: the DIF which should be reconfigured + :type dif: `rumba.model.ShimEthDIF` + :param state: the desired state + :type state: `str` -- either `up` or `down` + """ if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") if not isinstance(dif, model.ShimEthDIF): - raise Exception("Not a Shim Ethernet DIF.") + raise ValueError("Not a Shim Ethernet DIF.") + if state not in ['up', 'down']: + raise ValueError('Only possible states are "up" and "down"') if self._script is None: self._script = _Script(self) @@ -763,13 +969,46 @@ class StoryBoard(SBEntity): action = functools.partial(node.set_link_state, dif, state) self._script.add_event(Event(action, ev_time=t)) - def set_link_up(self, t, dif): - self.set_link_state(t, dif, 'up') + def schedule_link_up(self, t, dif): + """ + Schedules a link's (`rumba.model.ShimEthDIF`) state to go + up at the specified time. + + :param t: the time in the storyboard at which the state + change should happen + :type t: `float` + :param dif: the DIF which should be reconfigured + :type dif: `rumba.model.ShimEthDIF` + """ + self.schedule_link_state(t, dif, 'up') + + def schedule_link_down(self, t, dif): + """ + Schedules a link's (`rumba.model.ShimEthDIF`) state to go + down at the specified time. + + :param t: the time in the storyboard at which the state + change should happen + :type t: `float` + :param dif: the DIF which should be reconfigured + :type dif: `rumba.model.ShimEthDIF` + """ + self.schedule_link_state(t, dif, 'down') + + def schedule_node_state(self, t, node, state): + """ + Schedules a node's state to go up or down at the specified time. - def set_link_down(self, t, dif): - self.set_link_state(t, dif, 'down') + When a node is down all of its links are set to `down`. - def set_node_state(self, t, node, state): + :param t: the time in the storyboard at which the state + change should happen + :type t: `float` + :param node: the node which should be reconfigured + :type node: `rumba.model.Node` + :param state: the desired state + :type state: `str` -- either `up` or `down` + """ if self.experiment is None: raise ValueError("An experiment is needed to schedule commands.") @@ -779,29 +1018,54 @@ class StoryBoard(SBEntity): for dif in node.difs: if not isinstance(dif, model.ShimEthDIF): continue - action = functools.partial(node.set_link_state, dif, state) + action = functools.partial(node.schedule_link_state, dif, state) self._script.add_event(Event(action, ev_time=t)) - def set_node_up(self, t, node): - self.set_node_state(t, node, 'up') + def schedule_node_up(self, t, node): + """ + Schedules a node's state to go up at the specified time. - def set_node_down(self, t, node): - self.set_node_state(t, node, 'down') + :param t: the time in the storyboard at which the state + change should happen + :type t: `float` + :param node: the node which should be reconfigured + :type node: `rumba.model.Node` + """ + self.schedule_node_state(t, node, 'up') + + def schedule_node_down(self, t, node): + """ + Schedules a node's state to go down at the specified time. + + When a node is down all of its links are set to `down`. + + :param t: the time in the storyboard at which the state + change should happen + :type t: `float` + :param node: the node which should be reconfigured + :type node: `rumba.model.Node` + """ + self.schedule_node_state(t, node, 'down') def write_script(self, buffer): """ - Writes the script on a string buffer, at the current position + Writes the script on a (string-oriented) buffer, + at the current position - @param buffer: a string buffer. + :param buffer: a string buffer. + :type buffer: string-oriented `file-like` object """ self._script.write(buffer) def write_script_to_file(self, filename, clean=True): """ - Writes the script to a file, overwriting content. + Writes the script to a file. - @param filename: the name of the destination file - @param clean: if True, current file contents will be overwritten. + :param filename: the name of the destination file + :type filename: `str` + :param clean: if True, current file's contents will be overwritten. + If False, the script will be appended to the file. + :type clean: `bool` """ mode = 'w' if not clean: @@ -811,9 +1075,10 @@ class StoryBoard(SBEntity): def write_script_string(self): """ - Writes the script into a string and returns it. + Writes the script as a string and returns it. - @return: the script as a string. + :return: the script as a string. + :rtype: `str` """ s = StringIO() self.write_script(s) @@ -823,8 +1088,10 @@ class StoryBoard(SBEntity): """ Reads a script from a buffer, at the current position. - @param buffer: the buffer to read from. - @param clean: if True, discard the current script before reading. + :param buffer: the buffer to read from. + :type buffer: string-oriented `file-like` object + :param clean: if True, discard the current script before reading. + :type clean: `bool` """ if clean: self._script = _Script(self) @@ -834,8 +1101,10 @@ class StoryBoard(SBEntity): """ Reads a script from a file. - @param filename: the file to read from. - @param clean: if True, discard the current script before reading. + :param filename: the file to read from. + :type filename: `str` + :param clean: if True, discard the current script before reading. + :type clean: `bool` """ if clean: self._script = _Script(self) @@ -844,10 +1113,12 @@ class StoryBoard(SBEntity): def parse_script_string(self, string, clean=True): """ - Reads a script from a string. + Reads a string as a script. - @param string: the string to read from. - @param clean: if True, discard the current script before reading. + :param string: the string to read from. + :type string: `str` + :param clean: if True, discard the current script before reading. + :type clean: `bool` """ if clean: self._script = _Script(self) @@ -856,13 +1127,17 @@ class StoryBoard(SBEntity): def capture_traffic(self, start, end, node, dif): """ - Captures the traffic of an interface on a certain node. - - :param start: The time to start capturing. - :param end: The time to stop capturing. - :param node: The node to capture on. - :param dif: The Shim Ethernet DIF of the node, Rumba - automatically resolves the correct interface. + Captures the traffic of an interface on a node. + + :param start: the time to start capturing. + :type start: `float` + :param end: the time to stop capturing. + :type end: `float` + :param node: the node to capture on. + :type node: `rumba.model.Node` + :param dif: the node's Shim Ethernet DIF whose interface + will be used for the capture. + :type dif: `rumba.model.ShimEthDIF` """ for ipcp in dif.ipcps: if ipcp.node is not node: @@ -894,6 +1169,7 @@ class StoryBoard(SBEntity): class Event(object): + """Class representing an event in a `.StoryBoard`""" cur_id = -1 @@ -904,11 +1180,15 @@ class Event(object): def __init__(self, action, ev_id=None, ev_time=None, trigger=None): """ - @param ev_id: (str) id of the event - @param action: (any SBEntity method) action to undertake - when event is activated - @param ev_time: (float) seconds to wait before running the event - @param trigger: (Event) Event which must complete before this runs + :param ev_id: ID of the event + :type ev_id: `str` + :param action: action to undertake when event is activated + :type action: nullary `callable` + :param ev_time: seconds to wait before running the event + :type ev_time: `float` + :param trigger: Event which must complete before + this event runs + :type trigger: `.Event` """ self.id = ev_id if ev_id is not None else self.generate_id() if ev_time is None and trigger is None: @@ -957,6 +1237,11 @@ class Event(object): @property def failed(self): + """ + + :return: True if this event's execution failed + :rtype: `bool` + """ return self.exception is not None def pre_exec(self): # hook to be overridden @@ -971,9 +1256,7 @@ class Event(object): raise ValueError('Event %s has already ran' % self.id) def run(self): - """ - Run this event's action - """ + """Run this event's action""" self._start() try: self.action() @@ -983,9 +1266,12 @@ class Event(object): def check(self, cur_time): """ - Check if this event can be run. - @param cur_time: (float) current time - @return: True if the preconditions are satisfied, False otherwise. + Check if this event can be run, i.e. it's prerequisites are satisfied. + + :param cur_time: current elapsed time from storyboard's start. + :type cur_time: `float` + :return: True if the preconditions are satisfied, False otherwise. + :rtype: `bool` """ return \ (self.time is None or cur_time > self.time) \ diff --git a/rumba/utils.py b/rumba/utils.py index f6d6c1c..fd12333 100644 --- a/rumba/utils.py +++ b/rumba/utils.py @@ -88,19 +88,27 @@ class ExperimentManager(object): syslogs_strategy=NO_SYSLOGS, syslogs=None): """ - Initializes the ExperimentManager. - :param experiment: The experiment name. + :type experiment: `rumba.model.Experiment` :param swap_out_strategy: What action to perform on swap-out. - :param syslog_strategy: What system and prototype logs to retrieve + :param syslogs_strategy: What system and prototype logs to retrieve before swap-out. :param syslogs: The location of the syslogs in case of custom syslogs. + :type syslogs: `str` + + .. note:: Options for swap_out_strategy are + + - NO_SWAPOUT == 0, + - AUTO_SWAPOUT == 1, + - PAUSE_SWAPOUT == 2, + - PROMPT_SWAPOUT == 3. - .. note:: Options for swap_out_strategy are NO_SWAPOUT, AUTO_SWAPOUT, - PAUSE_SWAPOUT, PROMPT_SWAPOUT. + .. note:: Options for syslog_strategy are - .. note:: Options for syslog_strategy are NO_SYSLOGS, DEFAULT_SYSLOGS, - DMESG_SYSLOGS, CUSTOM_SYSLOGS. + - NO_SYSLOGS == 0, + - DEFAULT_SYSLOGS == 1, + - DMESG_SYSLOGS == 2, + - CUSTOM_SYSLOGS == 3. """ assert isinstance(experiment, model.Experiment), \ 'An experiment instance is required.' |