diff options
| -rw-r--r-- | rumba/elements/experimentation.py | 10 | ||||
| -rw-r--r-- | rumba/prototypes/ouroboros.py | 91 | ||||
| -rw-r--r-- | rumba/storyboard.py | 40 | 
3 files changed, 90 insertions, 51 deletions
diff --git a/rumba/elements/experimentation.py b/rumba/elements/experimentation.py index 8f3f40a..0f734f8 100644 --- a/rumba/elements/experimentation.py +++ b/rumba/elements/experimentation.py @@ -117,7 +117,8 @@ class Experiment(object):                   log_dir=None,                   prototype_logs=None,                   enrollment_strategy='minimal', -                 dt_strategy='full-mesh'): +                 dt_strategy='full-mesh', +                 server_decorator=None):          """          :param testbed: The testbed of the experiment.          :param nodes: The list of nodes in the experiment. @@ -127,10 +128,17 @@ class Experiment(object):          :param prototype_logs: Where the prototype logs its output.          :param enrollment_strategy: Can be 'full-mesh', 'minimal' or 'manual'.          :param dt_strategy: For data flows, 'full-mesh', 'minimal' or 'manual'. +        :param server_decorator: a decorator function which will be applied to +                                 storyboard.server instances when using +                                 this prototype          """          if nodes is None:              nodes = list()          self.nodes = nodes +        if server_decorator is None: +            def server_decorator(server): +                return server +        self.server_decorator = server_decorator          self.git_repo = git_repo          self.git_branch = git_branch          self.testbed = testbed diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py index 51f3629..0c16482 100644 --- a/rumba/prototypes/ouroboros.py +++ b/rumba/prototypes/ouroboros.py @@ -34,18 +34,62 @@ import rumba.multiprocess as m_processing  import rumba.log as log  import rumba.testbeds.local as local  import rumba.testbeds.dockertb as docker - +import rumba.storyboard as sb  logger = log.get_logger(__name__) +class OurServer(sb.Server): + +    def __init__(self, server): +        super(OurServer, self).__init__( +            server.ap, +            server.arrival_rate, +            server.actual_parameter + server.min_duration, +            server.options, +            server.max_clients, +            server.clients, +            server.nodes, +            server.min_duration, +            server.id, +            server.as_root, +            server.difs +        ) + +    def _make_run_cmd(self): +        o_cmd = super(OurServer, self)._make_run_cmd() + +        # Run and store PID +        n_cmd = 'pid=$(%s); ' % (o_cmd,) + +        # Add bind command +        n_cmd += 'irm b process $pid name %s > /dev/null 2>&1; ' % (self.id,) + +        # Build register command +        r_cmd = 'irm r n %s ' % (self.id,) +        if len(self.difs) == 0: +            r_cmd += 'layer \'*\'' +        else: +            r_cmd += ' '.join('layer %s' % (layer,) for layer in self.difs) +        r_cmd += ' > /dev/null 2>&1; ' + +        # Add register command +        n_cmd += r_cmd +        n_cmd += 'echo $pid'  # We need to return the pid for the sb + +        return n_cmd + +  class Experiment(mod.Experiment):      """      Represents an Ouroboros experiment.      """ -    def __init__(self, testbed, nodes=None, +    def __init__(self, +                 testbed, +                 nodes=None,                   git_repo='git://ouroboros.ilabt.imec.be/ouroboros', -                 git_branch='master', enrollment_strategy='minimal', +                 git_branch='master', +                 enrollment_strategy='minimal',                   dt_strategy='full-mesh'):          """          Initializes the experiment class. @@ -57,9 +101,16 @@ class Experiment(mod.Experiment):          :param enrollment_strategy: Can be 'full-mesh', 'minimal' or 'manual'.          :param dt_strategy: For data flows, 'full-mesh', 'minimal' or 'manual'.          """ -        mod.Experiment.__init__(self, testbed, nodes, git_repo, git_branch, -                                enrollment_strategy=enrollment_strategy, -                                dt_strategy=dt_strategy) +        mod.Experiment.__init__( +            self, +            testbed, +            nodes, +            git_repo, +            git_branch, +            enrollment_strategy=enrollment_strategy, +            dt_strategy=dt_strategy, +            server_decorator=OurServer +        )          self.r_ipcps = dict()          self.set_startup_command("irmd") @@ -248,34 +299,6 @@ class Experiment(mod.Experiment):              logger.info("Killing IRMd...")              subprocess.check_call('sudo killall -15 irmd'.split()) -    def bind_program(self, node, program, name): -        """ -        Bind a program to a certain name on a certain node. - -        :param node: The node -        :param program: The binary name -        :param name: Name to bind to -        """ -        node.execute_command('irm b prog %s name %s' % (program, name)) - -    def register_name(self, node, name, layers): -        """ -        Bind a program to a certain name on a certain node. - -        :param node: The node -        :param name: Name to register -        :param layers: Layers to register in. If it is None, -                       then it registers in all layers. -        """ -        cmd = 'irm r n %s ' % name -        if layers is None: -            cmd += 'layer \'*\'' -        else: -            for layer in layers: -                cmd += 'layer %s ' % layer - -        node.execute_command(cmd) -      def destroy_dif(self, dif):          for ipcp in dif.ipcps:              ipcp.node.execute_command('irm i d n ' + ipcp.name) diff --git a/rumba/storyboard.py b/rumba/storyboard.py index 2f1c6de..ecd639d 100644 --- a/rumba/storyboard.py +++ b/rumba/storyboard.py @@ -41,7 +41,6 @@ import uuid  import rumba.model as model  import rumba.ssh_support as ssh_support  import rumba.log as log -import rumba.prototypes.ouroboros as our  try:      from io import StringIO @@ -326,7 +325,7 @@ class Server(_SBEntity):      def __init__(self, ap, arrival_rate, mean_duration,                   options=None, max_clients=float('inf'),                   clients=None, nodes=None, min_duration=2, -                 s_id=None, as_root=False): +                 s_id=None, as_root=False, difs=None):          """          :param ap: the application binary or command which should be run @@ -356,6 +355,10 @@ class Server(_SBEntity):          :param as_root: if true, the server app will be started                          with root permissions          :type as_root: `bool` +        :param difs: the difs this server intends to register to +                     (note: the effect of this parameter is prototype +                     dependent, and other strategies might be required) +        :type difs: `rumba.model.DIF` or `list` thereof          """          self.ap = ap          e_id = s_id if s_id is not None else self.ap.replace(' ', '_') @@ -374,6 +377,13 @@ class Server(_SBEntity):          self.pids = {}          self.min_duration = min_duration          self.as_root = as_root +        if difs is None: +            difs = [] +        elif hasattr(difs, '__iter__'): +            difs = list(difs) +        else: +            difs = [difs] +        self.difs = difs      def add_client(self, client):          """ @@ -476,17 +486,18 @@ class Server(_SBEntity):              proc_id=p          ) +    def _make_run_cmd(self): +        run_cmd = self.ap + ( +            (" " + self.options) if self.options is not None else "" +        ) +        return "./startup.sh %s %s" % ('server_' + self.id, run_cmd) +      def run(self):          """Starts this server"""          for node in self.nodes: -            if isinstance(self.experiment, our.Experiment): -                self.experiment.bind_program(node, self.ap, self.id) -                self.experiment.register_name(node, self.id, None) -            run_cmd = self.ap + ( -                (" " + self.options) if self.options is not None else "" -            ) -            cmd = "./startup.sh %s %s" % ('server_' + self.id, run_cmd) +            cmd = self._make_run_cmd() +              logger.debug(                  'Starting server %s on node %s.',                  self.id, node.name @@ -580,6 +591,10 @@ class StoryBoard(_SBEntity):                      self.shims[dif.name] = dif      def _validate_and_add_server(self, s, n=None): +        if self.experiment is None: +            raise ValueError("Cannot add a server before " +                             "setting the experiment.") +        s = self.experiment.server_decorator(s)          if len(s.clients) == 0:              logger.warning("'%s' server has no registered clients.", s.ap)          else: @@ -596,7 +611,6 @@ class StoryBoard(_SBEntity):                                   'not in experiment.' % (node.name,))          self.server_apps[s.id] = s          self._build_nodes_lists() -        s.experiment = self.experiment      def set_experiment(self, experiment):          """ @@ -618,9 +632,6 @@ class StoryBoard(_SBEntity):          :param server: the server application          :type server: `.Server`          """ -        if self.experiment is None: -            raise ValueError("Cannot add a server before " -                             "setting the experiment.")          if not isinstance(server, Server):              raise TypeError('Argument must be of type Server')          self._validate_and_add_server(server) @@ -635,9 +646,6 @@ class StoryBoard(_SBEntity):          :param node: the node upon which the server should run          :type node: `rumba.model.Node`          """ -        if self.experiment is None: -            raise ValueError("Cannot add a server before " -                             "setting the experiment.")          if not isinstance(server, Server):              raise TypeError('First argument must be of type Server')          if not isinstance(node, model.Node):  | 
