diff options
author | Marco Capitani <m.capitani@nextworks.it> | 2017-06-13 10:09:54 +0200 |
---|---|---|
committer | Marco Capitani <m.capitani@nextworks.it> | 2017-06-13 10:09:54 +0200 |
commit | 457977f337a47caddf8788e1d4e1d1736f2a6ccb (patch) | |
tree | 12fa2a2be0f57be3ea2acd3623b01f55c8de1092 /rumba/model.py | |
parent | 3081d070cda223afd548645143142e1104b07d83 (diff) | |
parent | 53602860e17d650f9ab850cf9a206de6a8712c15 (diff) | |
download | rumba-457977f337a47caddf8788e1d4e1d1736f2a6ccb.tar.gz rumba-457977f337a47caddf8788e1d4e1d1736f2a6ccb.zip |
Merge branch 'master' into policies
Diffstat (limited to 'rumba/model.py')
-rw-r--r-- | rumba/model.py | 234 |
1 files changed, 179 insertions, 55 deletions
diff --git a/rumba/model.py b/rumba/model.py index 442933c..e0f1dcc 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -20,6 +20,9 @@ # MA 02110-1301 USA import abc +import random + +import time import rumba.log as log @@ -35,11 +38,18 @@ logger = log.get_logger(__name__) # @exp_name [string] experiment name # class Testbed: - def __init__(self, exp_name, username, password, proj_name): + def __init__(self, + exp_name, + username, + password, + proj_name, + http_proxy=None): self.username = username self.password = password self.proj_name = proj_name self.exp_name = exp_name + self.http_proxy = http_proxy + self.flags = {'no_vlan_offload': False} @abc.abstractmethod def swap_in(self, experiment): @@ -152,13 +162,12 @@ class SSHConfig: # # @difs: DIFs the node will have an IPCP in # @dif_registrations: Which DIF is registered in which DIF -# @registrations: Registrations of names in DIFs -# @bindings: Binding of names on the processing system # @policies: dict of dif -> policy dict to apply for that dif in this node # +# class Node: def __init__(self, name, difs=None, dif_registrations=None, - registrations=None, bindings=None, policies=None): + client=False, policies=None): self.name = name if difs is None: difs = list() @@ -168,12 +177,6 @@ class Node: if dif_registrations is None: dif_registrations = dict() self.dif_registrations = dif_registrations - if registrations is None: - registrations = dict() - self.registrations = registrations - if bindings is None: - bindings = dict() - self.bindings = bindings self.ssh_config = SSHConfig(name) self.ipcps = [] if policies is None: @@ -183,6 +186,7 @@ class Node: if hasattr(dif, 'policy'): self.policies[dif] = \ Policy(dif, self, policies.get(dif.name, {})) + self.client = client self._validate() @@ -197,18 +201,14 @@ class Node: "to be part of DIF %s" % (self.name, dif.name)) def _validate(self): - # Check that DIFs referenced in self.dif_registrations and - # in self.registrations are part of self.difs + # Check that DIFs referenced in self.dif_registrations + # are part of self.difs for upper in self.dif_registrations: self._undeclared_dif(upper) for lower in self.dif_registrations[upper]: self._undeclared_dif(lower) - for appl in self.registrations: - for dif in self.registrations[appl]: - self._undeclared_dif(dif) - - def __repr__(self): # TODO add policies in repr? + def __repr__(self): # TODO add policies in repr s = "Node " + self.name + ":\n" s += " DIFs: [ " @@ -226,19 +226,6 @@ class Node: s += ", ".join(rl) s += " ]\n" - s += " Name registrations: [ " - for name in self.registrations: - difs = self.registrations[name] - s += "%s => [ " % name - s += ", ".join([dif.name for dif in difs]) - s += " ]" - s += " ]\n" - - s += " Bindings: [ " - s += ", ".join(["'%s' => '%s'" % (ap, self.bindings[ap]) - for ap in self.bindings]) - s += " ]\n" - return s def __hash__(self): @@ -275,22 +262,6 @@ class Node: self.dif_registrations[upper].remove(lower) self._validate() - def add_registration(self, name, dif): - self.dif_registrations[name].append(dif) - self._validate() - - def del_registration(self, name, dif): - self.dif_registrations[name].remove(dif) - self._validate() - - def add_binding(self, name, ap): - self.bindings[name] = ap - self._validate() - - def del_binding(self, name): - del self.bindings[name] - self._validate() - def add_policy(self, dif, component_name, policy_name, **parameters): self.policies[dif].add_policy(component_name, policy_name, **parameters) @@ -311,7 +282,7 @@ class IPCP: self.dif = dif self.registrations = [] - # Is this node the first in the DIF, so that it does not need + # Is this IPCP the first in its DIF, so that it does not need # to enroll to anyone ? self.dif_bootstrapper = False @@ -424,6 +395,10 @@ class Experiment: difsdeps_inc = dict() for node in self.nodes: + for dif in node.difs: + if dif not in difsdeps_adj: + difsdeps_adj[dif] = set() + for upper in node.dif_registrations: for lower in node.dif_registrations[upper]: if upper not in difsdeps_inc: @@ -445,6 +420,14 @@ class Experiment: difsdeps_inc_cnt[dif] = len(difsdeps_inc[dif]) del difsdeps_inc + # Init difsdeps_inc_cnt for those DIFs that do not + # act as lower IPCPs nor upper IPCPs for registration + # operations + for node in self.nodes: + for dif in node.difs: + if dif not in difsdeps_inc_cnt: + difsdeps_inc_cnt[dif] = 0 + # Run Kahn's algorithm to compute topological # ordering on the DIFs graph. frontier = set() @@ -576,6 +559,7 @@ class Experiment: if dif not in node.difs: continue + # Create an instance of the required IPCP class ipcp = dif.get_ipcp_class()( name='%s.%s' % (dif.name, node.name), node=node, dif=dif) @@ -584,23 +568,34 @@ class Experiment: for lower in node.dif_registrations[dif]: ipcp.registrations.append(lower) + node.ipcps.append(ipcp) + dif.ipcps.append(ipcp) + + def compute_bootstrappers(self): + for node in self.nodes: + for ipcp in node.ipcps: ipcp.dif_bootstrapper = True for el in self.enrollments: for e in el: - if e['dif'] != dif: + if e['dif'] != ipcp.dif: # Skip this DIF break - if e['enrollee'] == node: + if e['enrollee'] == ipcp: ipcp.dif_bootstrapper = False # Exit the loops break if not ipcp.dif_bootstrapper: break - node.ipcps.append(ipcp) - dif.ipcps.append(ipcp) - - logger.info("IPCP for node %s: %s", node.name, node.ipcps) + def dump_ssh_info(self): + f = open('ssh_info', 'w') + for node in self.nodes: + f.write("%s;%s;%s;%s;%s\n" % (node.name, + self.testbed.username, + node.ssh_config.hostname, + node.ssh_config.port, + node.ssh_config.proxycommand)) + f.close() # Examine the nodes and DIFs, compute the registration and enrollment # order, the list of IPCPs to create, registrations, ... @@ -608,19 +603,148 @@ class Experiment: self.compute_dif_ordering() self.compute_ipcps() self.compute_enrollments() + self.compute_bootstrappers() + for node in self.nodes: + logger.info("IPCPs for node %s: %s", node.name, node.ipcps) @abc.abstractmethod def install_prototype(self): - raise Exception('run_prototype() method not implemented') + raise Exception('install_prototype() method not implemented') @abc.abstractmethod def bootstrap_prototype(self): - raise Exception('run_prototype() method not implemented') + raise Exception('bootstrap_prototype() method not implemented') + + @abc.abstractmethod + def prototype_name(self): + raise Exception('prototype_name() method not implemented') def swap_in(self): # Realize the experiment testbed (testbed-specific) self.testbed.swap_in(self) + self.dump_ssh_info() def swap_out(self): # Undo the testbed (testbed-specific) self.testbed.swap_out(self) + + +# Base class for client programs +# +# @ap: Application Process binary +# @options: Options to pass to the binary +# +class Client(object): + def __init__(self, ap, options=None): + self.ap = ap + self.options = options + + def start_process(self, node, duration, start_time): + return ClientProcess(self.ap, node, duration, start_time, self.options) + + +# Base class for client processes +# +# @ap: Application Process binary +# @node: The node on which this process should run +# @duration: The time (in seconds) this process should run +# @start_time: The time at which this process is started. +# @options: Options to pass to the binary +# +class ClientProcess(Client): + def __init__(self, ap, node, duration, start_time, options=None): + super(ClientProcess, self).__init__(ap, options=options) + self.node = node + self.duration = duration + self.start_time = start_time + self.run() + self.running = True + + def run(self): + pass # TODO to be implemented + + def stop(self): + pass # TODO to be implemented + + def check(self, now): + if not self.running: + return + if now - self.start_time >= self.duration: + self.stop() + + +# Base class for server programs +# +# @ap: Application Process binary +# @arrival_rate: Average requests/s to be received by this server +# @mean_duration: Average duration of a client connection (in seconds) +# @options: Options to pass to the binary +# @max_clients: Maximum number of clients to serve +# @clients: Client binaries that will use this server +# @nodes: Specific nodes to start this server on +# +class Server: + def __init__(self, ap, arrival_rate, mean_duration, + options=None, max_clients=None, + clients=None, nodes=None): + self.ap = ap + self.options = options + self.max_clients = max_clients + if clients is None: + clients = list() + self.clients = clients + self.nodes = nodes + self.arrival_rate = arrival_rate # mean requests/s + self.mean_duration = mean_duration # in seconds + + def add_client(self, client): + self.clients.append(client) + + def del_client(self, client): + self.clients.remove(client) + + def add_node(self, node): + self.nodes.append(node) + + def del_node(self, node): + self.nodes.remove(node) + + def get_new_clients(self, interval): + """ + Returns a list of clients of size appropriate to the server's rate. + + The list's size should be a sample from Poisson(arrival_rate) over + interval seconds. + Hence, the average size should be interval * arrival_rate. + """ + pass + + def make_client_process(self): + """Returns a client of this server""" + if len(self.clients) == 0: + raise Exception("Server %s has empty client list," % (self,)) + pass # TODO should return a ClientProcess + + +# Base class for ARCFIRE storyboards +# +# @experiment: Experiment to use as input +# @duration: Duration of the whole storyboard +# @servers: App servers available in the network +# +class StoryBoard: + def __init__(self, experiment, duration, servers=None): + self.experiment = experiment + self.duration = duration + if servers is None: + servers = list() + self.servers = servers + + def add_server(self, server): + self.servers.append(server) + + def del_server(self, server): + self.servers.remove(server) + + def start(self): + pass |