aboutsummaryrefslogtreecommitdiff
path: root/rumba/model.py
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-06-13 10:09:54 +0200
committerMarco Capitani <m.capitani@nextworks.it>2017-06-13 10:09:54 +0200
commit457977f337a47caddf8788e1d4e1d1736f2a6ccb (patch)
tree12fa2a2be0f57be3ea2acd3623b01f55c8de1092 /rumba/model.py
parent3081d070cda223afd548645143142e1104b07d83 (diff)
parent53602860e17d650f9ab850cf9a206de6a8712c15 (diff)
downloadrumba-457977f337a47caddf8788e1d4e1d1736f2a6ccb.tar.gz
rumba-457977f337a47caddf8788e1d4e1d1736f2a6ccb.zip
Merge branch 'master' into policies
Diffstat (limited to 'rumba/model.py')
-rw-r--r--rumba/model.py234
1 files changed, 179 insertions, 55 deletions
diff --git a/rumba/model.py b/rumba/model.py
index 442933c..e0f1dcc 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -20,6 +20,9 @@
# MA 02110-1301 USA
import abc
+import random
+
+import time
import rumba.log as log
@@ -35,11 +38,18 @@ logger = log.get_logger(__name__)
# @exp_name [string] experiment name
#
class Testbed:
- def __init__(self, exp_name, username, password, proj_name):
+ def __init__(self,
+ exp_name,
+ username,
+ password,
+ proj_name,
+ http_proxy=None):
self.username = username
self.password = password
self.proj_name = proj_name
self.exp_name = exp_name
+ self.http_proxy = http_proxy
+ self.flags = {'no_vlan_offload': False}
@abc.abstractmethod
def swap_in(self, experiment):
@@ -152,13 +162,12 @@ class SSHConfig:
#
# @difs: DIFs the node will have an IPCP in
# @dif_registrations: Which DIF is registered in which DIF
-# @registrations: Registrations of names in DIFs
-# @bindings: Binding of names on the processing system
# @policies: dict of dif -> policy dict to apply for that dif in this node
#
+#
class Node:
def __init__(self, name, difs=None, dif_registrations=None,
- registrations=None, bindings=None, policies=None):
+ client=False, policies=None):
self.name = name
if difs is None:
difs = list()
@@ -168,12 +177,6 @@ class Node:
if dif_registrations is None:
dif_registrations = dict()
self.dif_registrations = dif_registrations
- if registrations is None:
- registrations = dict()
- self.registrations = registrations
- if bindings is None:
- bindings = dict()
- self.bindings = bindings
self.ssh_config = SSHConfig(name)
self.ipcps = []
if policies is None:
@@ -183,6 +186,7 @@ class Node:
if hasattr(dif, 'policy'):
self.policies[dif] = \
Policy(dif, self, policies.get(dif.name, {}))
+ self.client = client
self._validate()
@@ -197,18 +201,14 @@ class Node:
"to be part of DIF %s" % (self.name, dif.name))
def _validate(self):
- # Check that DIFs referenced in self.dif_registrations and
- # in self.registrations are part of self.difs
+ # Check that DIFs referenced in self.dif_registrations
+ # are part of self.difs
for upper in self.dif_registrations:
self._undeclared_dif(upper)
for lower in self.dif_registrations[upper]:
self._undeclared_dif(lower)
- for appl in self.registrations:
- for dif in self.registrations[appl]:
- self._undeclared_dif(dif)
-
- def __repr__(self): # TODO add policies in repr?
+ def __repr__(self): # TODO add policies in repr
s = "Node " + self.name + ":\n"
s += " DIFs: [ "
@@ -226,19 +226,6 @@ class Node:
s += ", ".join(rl)
s += " ]\n"
- s += " Name registrations: [ "
- for name in self.registrations:
- difs = self.registrations[name]
- s += "%s => [ " % name
- s += ", ".join([dif.name for dif in difs])
- s += " ]"
- s += " ]\n"
-
- s += " Bindings: [ "
- s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap])
- for ap in self.bindings])
- s += " ]\n"
-
return s
def __hash__(self):
@@ -275,22 +262,6 @@ class Node:
self.dif_registrations[upper].remove(lower)
self._validate()
- def add_registration(self, name, dif):
- self.dif_registrations[name].append(dif)
- self._validate()
-
- def del_registration(self, name, dif):
- self.dif_registrations[name].remove(dif)
- self._validate()
-
- def add_binding(self, name, ap):
- self.bindings[name] = ap
- self._validate()
-
- def del_binding(self, name):
- del self.bindings[name]
- self._validate()
-
def add_policy(self, dif, component_name, policy_name, **parameters):
self.policies[dif].add_policy(component_name, policy_name, **parameters)
@@ -311,7 +282,7 @@ class IPCP:
self.dif = dif
self.registrations = []
- # Is this node the first in the DIF, so that it does not need
+ # Is this IPCP the first in its DIF, so that it does not need
# to enroll to anyone ?
self.dif_bootstrapper = False
@@ -424,6 +395,10 @@ class Experiment:
difsdeps_inc = dict()
for node in self.nodes:
+ for dif in node.difs:
+ if dif not in difsdeps_adj:
+ difsdeps_adj[dif] = set()
+
for upper in node.dif_registrations:
for lower in node.dif_registrations[upper]:
if upper not in difsdeps_inc:
@@ -445,6 +420,14 @@ class Experiment:
difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif])
del difsdeps_inc
+ # Init difsdeps_inc_cnt for those DIFs that do not
+ # act as lower IPCPs nor upper IPCPs for registration
+ # operations
+ for node in self.nodes:
+ for dif in node.difs:
+ if dif not in difsdeps_inc_cnt:
+ difsdeps_inc_cnt[dif] = 0
+
# Run Kahn's algorithm to compute topological
# ordering on the DIFs graph.
frontier = set()
@@ -576,6 +559,7 @@ class Experiment:
if dif not in node.difs:
continue
+ # Create an instance of the required IPCP class
ipcp = dif.get_ipcp_class()(
name='%s.%s' % (dif.name, node.name),
node=node, dif=dif)
@@ -584,23 +568,34 @@ class Experiment:
for lower in node.dif_registrations[dif]:
ipcp.registrations.append(lower)
+ node.ipcps.append(ipcp)
+ dif.ipcps.append(ipcp)
+
+ def compute_bootstrappers(self):
+ for node in self.nodes:
+ for ipcp in node.ipcps:
ipcp.dif_bootstrapper = True
for el in self.enrollments:
for e in el:
- if e['dif'] != dif:
+ if e['dif'] != ipcp.dif:
# Skip this DIF
break
- if e['enrollee'] == node:
+ if e['enrollee'] == ipcp:
ipcp.dif_bootstrapper = False
# Exit the loops
break
if not ipcp.dif_bootstrapper:
break
- node.ipcps.append(ipcp)
- dif.ipcps.append(ipcp)
-
- logger.info("IPCP for node %s: %s", node.name, node.ipcps)
+ def dump_ssh_info(self):
+ f = open('ssh_info', 'w')
+ for node in self.nodes:
+ f.write("%s;%s;%s;%s;%s\n" % (node.name,
+ self.testbed.username,
+ node.ssh_config.hostname,
+ node.ssh_config.port,
+ node.ssh_config.proxycommand))
+ f.close()
# Examine the nodes and DIFs, compute the registration and enrollment
# order, the list of IPCPs to create, registrations, ...
@@ -608,19 +603,148 @@ class Experiment:
self.compute_dif_ordering()
self.compute_ipcps()
self.compute_enrollments()
+ self.compute_bootstrappers()
+ for node in self.nodes:
+ logger.info("IPCPs for node %s: %s", node.name, node.ipcps)
@abc.abstractmethod
def install_prototype(self):
- raise Exception('run_prototype() method not implemented')
+ raise Exception('install_prototype() method not implemented')
@abc.abstractmethod
def bootstrap_prototype(self):
- raise Exception('run_prototype() method not implemented')
+ raise Exception('bootstrap_prototype() method not implemented')
+
+ @abc.abstractmethod
+ def prototype_name(self):
+ raise Exception('prototype_name() method not implemented')
def swap_in(self):
# Realize the experiment testbed (testbed-specific)
self.testbed.swap_in(self)
+ self.dump_ssh_info()
def swap_out(self):
# Undo the testbed (testbed-specific)
self.testbed.swap_out(self)
+
+
+# Base class for client programs
+#
+# @ap: Application Process binary
+# @options: Options to pass to the binary
+#
+class Client(object):
+ def __init__(self, ap, options=None):
+ self.ap = ap
+ self.options = options
+
+ def start_process(self, node, duration, start_time):
+ return ClientProcess(self.ap, node, duration, start_time, self.options)
+
+
+# Base class for client processes
+#
+# @ap: Application Process binary
+# @node: The node on which this process should run
+# @duration: The time (in seconds) this process should run
+# @start_time: The time at which this process is started.
+# @options: Options to pass to the binary
+#
+class ClientProcess(Client):
+ def __init__(self, ap, node, duration, start_time, options=None):
+ super(ClientProcess, self).__init__(ap, options=options)
+ self.node = node
+ self.duration = duration
+ self.start_time = start_time
+ self.run()
+ self.running = True
+
+ def run(self):
+ pass # TODO to be implemented
+
+ def stop(self):
+ pass # TODO to be implemented
+
+ def check(self, now):
+ if not self.running:
+ return
+ if now - self.start_time >= self.duration:
+ self.stop()
+
+
+# Base class for server programs
+#
+# @ap: Application Process binary
+# @arrival_rate: Average requests/s to be received by this server
+# @mean_duration: Average duration of a client connection (in seconds)
+# @options: Options to pass to the binary
+# @max_clients: Maximum number of clients to serve
+# @clients: Client binaries that will use this server
+# @nodes: Specific nodes to start this server on
+#
+class Server:
+ def __init__(self, ap, arrival_rate, mean_duration,
+ options=None, max_clients=None,
+ clients=None, nodes=None):
+ self.ap = ap
+ self.options = options
+ self.max_clients = max_clients
+ if clients is None:
+ clients = list()
+ self.clients = clients
+ self.nodes = nodes
+ self.arrival_rate = arrival_rate # mean requests/s
+ self.mean_duration = mean_duration # in seconds
+
+ def add_client(self, client):
+ self.clients.append(client)
+
+ def del_client(self, client):
+ self.clients.remove(client)
+
+ def add_node(self, node):
+ self.nodes.append(node)
+
+ def del_node(self, node):
+ self.nodes.remove(node)
+
+ def get_new_clients(self, interval):
+ """
+ Returns a list of clients of size appropriate to the server's rate.
+
+ The list's size should be a sample from Poisson(arrival_rate) over
+ interval seconds.
+ Hence, the average size should be interval * arrival_rate.
+ """
+ pass
+
+ def make_client_process(self):
+ """Returns a client of this server"""
+ if len(self.clients) == 0:
+ raise Exception("Server %s has empty client list," % (self,))
+ pass # TODO should return a ClientProcess
+
+
+# Base class for ARCFIRE storyboards
+#
+# @experiment: Experiment to use as input
+# @duration: Duration of the whole storyboard
+# @servers: App servers available in the network
+#
+class StoryBoard:
+ def __init__(self, experiment, duration, servers=None):
+ self.experiment = experiment
+ self.duration = duration
+ if servers is None:
+ servers = list()
+ self.servers = servers
+
+ def add_server(self, server):
+ self.servers.append(server)
+
+ def del_server(self, server):
+ self.servers.remove(server)
+
+ def start(self):
+ pass