aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rumba/elements/experimentation.py10
-rw-r--r--rumba/prototypes/ouroboros.py91
-rw-r--r--rumba/storyboard.py40
3 files changed, 90 insertions, 51 deletions
diff --git a/rumba/elements/experimentation.py b/rumba/elements/experimentation.py
index 8f3f40a..0f734f8 100644
--- a/rumba/elements/experimentation.py
+++ b/rumba/elements/experimentation.py
@@ -117,7 +117,8 @@ class Experiment(object):
log_dir=None,
prototype_logs=None,
enrollment_strategy='minimal',
- dt_strategy='full-mesh'):
+ dt_strategy='full-mesh',
+ server_decorator=None):
"""
:param testbed: The testbed of the experiment.
:param nodes: The list of nodes in the experiment.
@@ -127,10 +128,17 @@ class Experiment(object):
:param prototype_logs: Where the prototype logs its output.
:param enrollment_strategy: Can be 'full-mesh', 'minimal' or 'manual'.
:param dt_strategy: For data flows, 'full-mesh', 'minimal' or 'manual'.
+ :param server_decorator: a decorator function which will be applied to
+ storyboard.server instances when using
+ this prototype
"""
if nodes is None:
nodes = list()
self.nodes = nodes
+ if server_decorator is None:
+ def server_decorator(server):
+ return server
+ self.server_decorator = server_decorator
self.git_repo = git_repo
self.git_branch = git_branch
self.testbed = testbed
diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py
index 51f3629..0c16482 100644
--- a/rumba/prototypes/ouroboros.py
+++ b/rumba/prototypes/ouroboros.py
@@ -34,18 +34,62 @@ import rumba.multiprocess as m_processing
import rumba.log as log
import rumba.testbeds.local as local
import rumba.testbeds.dockertb as docker
-
+import rumba.storyboard as sb
logger = log.get_logger(__name__)
+class OurServer(sb.Server):
+
+ def __init__(self, server):
+ super(OurServer, self).__init__(
+ server.ap,
+ server.arrival_rate,
+ server.actual_parameter + server.min_duration,
+ server.options,
+ server.max_clients,
+ server.clients,
+ server.nodes,
+ server.min_duration,
+ server.id,
+ server.as_root,
+ server.difs
+ )
+
+ def _make_run_cmd(self):
+ o_cmd = super(OurServer, self)._make_run_cmd()
+
+ # Run and store PID
+ n_cmd = 'pid=$(%s); ' % (o_cmd,)
+
+ # Add bind command
+ n_cmd += 'irm b process $pid name %s > /dev/null 2>&1; ' % (self.id,)
+
+ # Build register command
+ r_cmd = 'irm r n %s ' % (self.id,)
+ if len(self.difs) == 0:
+ r_cmd += 'layer \'*\''
+ else:
+ r_cmd += ' '.join('layer %s' % (layer,) for layer in self.difs)
+ r_cmd += ' > /dev/null 2>&1; '
+
+ # Add register command
+ n_cmd += r_cmd
+ n_cmd += 'echo $pid' # We need to return the pid for the sb
+
+ return n_cmd
+
+
class Experiment(mod.Experiment):
"""
Represents an Ouroboros experiment.
"""
- def __init__(self, testbed, nodes=None,
+ def __init__(self,
+ testbed,
+ nodes=None,
git_repo='git://ouroboros.ilabt.imec.be/ouroboros',
- git_branch='master', enrollment_strategy='minimal',
+ git_branch='master',
+ enrollment_strategy='minimal',
dt_strategy='full-mesh'):
"""
Initializes the experiment class.
@@ -57,9 +101,16 @@ class Experiment(mod.Experiment):
:param enrollment_strategy: Can be 'full-mesh', 'minimal' or 'manual'.
:param dt_strategy: For data flows, 'full-mesh', 'minimal' or 'manual'.
"""
- mod.Experiment.__init__(self, testbed, nodes, git_repo, git_branch,
- enrollment_strategy=enrollment_strategy,
- dt_strategy=dt_strategy)
+ mod.Experiment.__init__(
+ self,
+ testbed,
+ nodes,
+ git_repo,
+ git_branch,
+ enrollment_strategy=enrollment_strategy,
+ dt_strategy=dt_strategy,
+ server_decorator=OurServer
+ )
self.r_ipcps = dict()
self.set_startup_command("irmd")
@@ -248,34 +299,6 @@ class Experiment(mod.Experiment):
logger.info("Killing IRMd...")
subprocess.check_call('sudo killall -15 irmd'.split())
- def bind_program(self, node, program, name):
- """
- Bind a program to a certain name on a certain node.
-
- :param node: The node
- :param program: The binary name
- :param name: Name to bind to
- """
- node.execute_command('irm b prog %s name %s' % (program, name))
-
- def register_name(self, node, name, layers):
- """
- Bind a program to a certain name on a certain node.
-
- :param node: The node
- :param name: Name to register
- :param layers: Layers to register in. If it is None,
- then it registers in all layers.
- """
- cmd = 'irm r n %s ' % name
- if layers is None:
- cmd += 'layer \'*\''
- else:
- for layer in layers:
- cmd += 'layer %s ' % layer
-
- node.execute_command(cmd)
-
def destroy_dif(self, dif):
for ipcp in dif.ipcps:
ipcp.node.execute_command('irm i d n ' + ipcp.name)
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index 2f1c6de..ecd639d 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -41,7 +41,6 @@ import uuid
import rumba.model as model
import rumba.ssh_support as ssh_support
import rumba.log as log
-import rumba.prototypes.ouroboros as our
try:
from io import StringIO
@@ -326,7 +325,7 @@ class Server(_SBEntity):
def __init__(self, ap, arrival_rate, mean_duration,
options=None, max_clients=float('inf'),
clients=None, nodes=None, min_duration=2,
- s_id=None, as_root=False):
+ s_id=None, as_root=False, difs=None):
"""
:param ap: the application binary or command which should be run
@@ -356,6 +355,10 @@ class Server(_SBEntity):
:param as_root: if true, the server app will be started
with root permissions
:type as_root: `bool`
+ :param difs: the difs this server intends to register to
+ (note: the effect of this parameter is prototype
+ dependent, and other strategies might be required)
+ :type difs: `rumba.model.DIF` or `list` thereof
"""
self.ap = ap
e_id = s_id if s_id is not None else self.ap.replace(' ', '_')
@@ -374,6 +377,13 @@ class Server(_SBEntity):
self.pids = {}
self.min_duration = min_duration
self.as_root = as_root
+ if difs is None:
+ difs = []
+ elif hasattr(difs, '__iter__'):
+ difs = list(difs)
+ else:
+ difs = [difs]
+ self.difs = difs
def add_client(self, client):
"""
@@ -476,17 +486,18 @@ class Server(_SBEntity):
proc_id=p
)
+ def _make_run_cmd(self):
+ run_cmd = self.ap + (
+ (" " + self.options) if self.options is not None else ""
+ )
+ return "./startup.sh %s %s" % ('server_' + self.id, run_cmd)
+
def run(self):
"""Starts this server"""
for node in self.nodes:
- if isinstance(self.experiment, our.Experiment):
- self.experiment.bind_program(node, self.ap, self.id)
- self.experiment.register_name(node, self.id, None)
- run_cmd = self.ap + (
- (" " + self.options) if self.options is not None else ""
- )
- cmd = "./startup.sh %s %s" % ('server_' + self.id, run_cmd)
+ cmd = self._make_run_cmd()
+
logger.debug(
'Starting server %s on node %s.',
self.id, node.name
@@ -580,6 +591,10 @@ class StoryBoard(_SBEntity):
self.shims[dif.name] = dif
def _validate_and_add_server(self, s, n=None):
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
+ s = self.experiment.server_decorator(s)
if len(s.clients) == 0:
logger.warning("'%s' server has no registered clients.", s.ap)
else:
@@ -596,7 +611,6 @@ class StoryBoard(_SBEntity):
'not in experiment.' % (node.name,))
self.server_apps[s.id] = s
self._build_nodes_lists()
- s.experiment = self.experiment
def set_experiment(self, experiment):
"""
@@ -618,9 +632,6 @@ class StoryBoard(_SBEntity):
:param server: the server application
:type server: `.Server`
"""
- if self.experiment is None:
- raise ValueError("Cannot add a server before "
- "setting the experiment.")
if not isinstance(server, Server):
raise TypeError('Argument must be of type Server')
self._validate_and_add_server(server)
@@ -635,9 +646,6 @@ class StoryBoard(_SBEntity):
:param node: the node upon which the server should run
:type node: `rumba.model.Node`
"""
- if self.experiment is None:
- raise ValueError("Cannot add a server before "
- "setting the experiment.")
if not isinstance(server, Server):
raise TypeError('First argument must be of type Server')
if not isinstance(node, model.Node):