From 28d34549ecf3a6fe5dde672569b7e8f5e05f1a5e Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Wed, 6 Jun 2018 10:30:34 +0200 Subject: sb + our: add server_decorator functionality and our decorator --- rumba/elements/experimentation.py | 10 ++++- rumba/prototypes/ouroboros.py | 91 ++++++++++++++++++++++++--------------- rumba/storyboard.py | 40 ++++++++++------- 3 files changed, 90 insertions(+), 51 deletions(-) diff --git a/rumba/elements/experimentation.py b/rumba/elements/experimentation.py index 8f3f40a..0f734f8 100644 --- a/rumba/elements/experimentation.py +++ b/rumba/elements/experimentation.py @@ -117,7 +117,8 @@ class Experiment(object): log_dir=None, prototype_logs=None, enrollment_strategy='minimal', - dt_strategy='full-mesh'): + dt_strategy='full-mesh', + server_decorator=None): """ :param testbed: The testbed of the experiment. :param nodes: The list of nodes in the experiment. @@ -127,10 +128,17 @@ class Experiment(object): :param prototype_logs: Where the prototype logs its output. :param enrollment_strategy: Can be 'full-mesh', 'minimal' or 'manual'. :param dt_strategy: For data flows, 'full-mesh', 'minimal' or 'manual'. + :param server_decorator: a decorator function which will be applied to + storyboard.server instances when using + this prototype """ if nodes is None: nodes = list() self.nodes = nodes + if server_decorator is None: + def server_decorator(server): + return server + self.server_decorator = server_decorator self.git_repo = git_repo self.git_branch = git_branch self.testbed = testbed diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py index 51f3629..0c16482 100644 --- a/rumba/prototypes/ouroboros.py +++ b/rumba/prototypes/ouroboros.py @@ -34,18 +34,62 @@ import rumba.multiprocess as m_processing import rumba.log as log import rumba.testbeds.local as local import rumba.testbeds.dockertb as docker - +import rumba.storyboard as sb logger = log.get_logger(__name__) +class OurServer(sb.Server): + + def __init__(self, server): + super(OurServer, self).__init__( + server.ap, + server.arrival_rate, + server.actual_parameter + server.min_duration, + server.options, + server.max_clients, + server.clients, + server.nodes, + server.min_duration, + server.id, + server.as_root, + server.difs + ) + + def _make_run_cmd(self): + o_cmd = super(OurServer, self)._make_run_cmd() + + # Run and store PID + n_cmd = 'pid=$(%s); ' % (o_cmd,) + + # Add bind command + n_cmd += 'irm b process $pid name %s > /dev/null 2>&1; ' % (self.id,) + + # Build register command + r_cmd = 'irm r n %s ' % (self.id,) + if len(self.difs) == 0: + r_cmd += 'layer \'*\'' + else: + r_cmd += ' '.join('layer %s' % (layer,) for layer in self.difs) + r_cmd += ' > /dev/null 2>&1; ' + + # Add register command + n_cmd += r_cmd + n_cmd += 'echo $pid' # We need to return the pid for the sb + + return n_cmd + + class Experiment(mod.Experiment): """ Represents an Ouroboros experiment. """ - def __init__(self, testbed, nodes=None, + def __init__(self, + testbed, + nodes=None, git_repo='git://ouroboros.ilabt.imec.be/ouroboros', - git_branch='master', enrollment_strategy='minimal', + git_branch='master', + enrollment_strategy='minimal', dt_strategy='full-mesh'): """ Initializes the experiment class. @@ -57,9 +101,16 @@ class Experiment(mod.Experiment): :param enrollment_strategy: Can be 'full-mesh', 'minimal' or 'manual'. :param dt_strategy: For data flows, 'full-mesh', 'minimal' or 'manual'. """ - mod.Experiment.__init__(self, testbed, nodes, git_repo, git_branch, - enrollment_strategy=enrollment_strategy, - dt_strategy=dt_strategy) + mod.Experiment.__init__( + self, + testbed, + nodes, + git_repo, + git_branch, + enrollment_strategy=enrollment_strategy, + dt_strategy=dt_strategy, + server_decorator=OurServer + ) self.r_ipcps = dict() self.set_startup_command("irmd") @@ -248,34 +299,6 @@ class Experiment(mod.Experiment): logger.info("Killing IRMd...") subprocess.check_call('sudo killall -15 irmd'.split()) - def bind_program(self, node, program, name): - """ - Bind a program to a certain name on a certain node. - - :param node: The node - :param program: The binary name - :param name: Name to bind to - """ - node.execute_command('irm b prog %s name %s' % (program, name)) - - def register_name(self, node, name, layers): - """ - Bind a program to a certain name on a certain node. - - :param node: The node - :param name: Name to register - :param layers: Layers to register in. If it is None, - then it registers in all layers. - """ - cmd = 'irm r n %s ' % name - if layers is None: - cmd += 'layer \'*\'' - else: - for layer in layers: - cmd += 'layer %s ' % layer - - node.execute_command(cmd) - def destroy_dif(self, dif): for ipcp in dif.ipcps: ipcp.node.execute_command('irm i d n ' + ipcp.name) diff --git a/rumba/storyboard.py b/rumba/storyboard.py index 2f1c6de..ecd639d 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -41,7 +41,6 @@ import uuid import rumba.model as model import rumba.ssh_support as ssh_support import rumba.log as log -import rumba.prototypes.ouroboros as our try: from io import StringIO @@ -326,7 +325,7 @@ class Server(_SBEntity): def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), clients=None, nodes=None, min_duration=2, - s_id=None, as_root=False): + s_id=None, as_root=False, difs=None): """ :param ap: the application binary or command which should be run @@ -356,6 +355,10 @@ class Server(_SBEntity): :param as_root: if true, the server app will be started with root permissions :type as_root: `bool` + :param difs: the difs this server intends to register to + (note: the effect of this parameter is prototype + dependent, and other strategies might be required) + :type difs: `rumba.model.DIF` or `list` thereof """ self.ap = ap e_id = s_id if s_id is not None else self.ap.replace(' ', '_') @@ -374,6 +377,13 @@ class Server(_SBEntity): self.pids = {} self.min_duration = min_duration self.as_root = as_root + if difs is None: + difs = [] + elif hasattr(difs, '__iter__'): + difs = list(difs) + else: + difs = [difs] + self.difs = difs def add_client(self, client): """ @@ -476,17 +486,18 @@ class Server(_SBEntity): proc_id=p ) + def _make_run_cmd(self): + run_cmd = self.ap + ( + (" " + self.options) if self.options is not None else "" + ) + return "./startup.sh %s %s" % ('server_' + self.id, run_cmd) + def run(self): """Starts this server""" for node in self.nodes: - if isinstance(self.experiment, our.Experiment): - self.experiment.bind_program(node, self.ap, self.id) - self.experiment.register_name(node, self.id, None) - run_cmd = self.ap + ( - (" " + self.options) if self.options is not None else "" - ) - cmd = "./startup.sh %s %s" % ('server_' + self.id, run_cmd) + cmd = self._make_run_cmd() + logger.debug( 'Starting server %s on node %s.', self.id, node.name @@ -580,6 +591,10 @@ class StoryBoard(_SBEntity): self.shims[dif.name] = dif def _validate_and_add_server(self, s, n=None): + if self.experiment is None: + raise ValueError("Cannot add a server before " + "setting the experiment.") + s = self.experiment.server_decorator(s) if len(s.clients) == 0: logger.warning("'%s' server has no registered clients.", s.ap) else: @@ -596,7 +611,6 @@ class StoryBoard(_SBEntity): 'not in experiment.' % (node.name,)) self.server_apps[s.id] = s self._build_nodes_lists() - s.experiment = self.experiment def set_experiment(self, experiment): """ @@ -618,9 +632,6 @@ class StoryBoard(_SBEntity): :param server: the server application :type server: `.Server` """ - if self.experiment is None: - raise ValueError("Cannot add a server before " - "setting the experiment.") if not isinstance(server, Server): raise TypeError('Argument must be of type Server') self._validate_and_add_server(server) @@ -635,9 +646,6 @@ class StoryBoard(_SBEntity): :param node: the node upon which the server should run :type node: `rumba.model.Node` """ - if self.experiment is None: - raise ValueError("Cannot add a server before " - "setting the experiment.") if not isinstance(server, Server): raise TypeError('First argument must be of type Server') if not isinstance(node, model.Node): -- cgit v1.2.3