diff options
author | Marco Capitani <m.capitani@nextworks.it> | 2017-06-21 17:08:16 +0200 |
---|---|---|
committer | Marco Capitani <m.capitani@nextworks.it> | 2017-06-21 17:08:16 +0200 |
commit | 26ed0d7231ce681e6f2041760ba69406ffb6ee86 (patch) | |
tree | 91ff9910ae6755d34ce2cb7067ccb2a5f6dd6125 | |
parent | 7b599d17b054055d5166a15f71a3e8246af986b7 (diff) | |
download | rumba-26ed0d7231ce681e6f2041760ba69406ffb6ee86.tar.gz rumba-26ed0d7231ce681e6f2041760ba69406ffb6ee86.zip |
Storyboard implemented, to be tested
-rw-r--r-- | rumba/log.py | 17 | ||||
-rw-r--r-- | rumba/model.py | 142 |
2 files changed, 131 insertions, 28 deletions
diff --git a/rumba/log.py b/rumba/log.py index d95c034..987f03a 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -24,6 +24,13 @@ import sys import multiprocessing +DEBUG = logging.DEBUG +INFO = logging.INFO +WARNING = logging.WARNING +ERROR = logging.ERROR +CRITICAL = logging.CRITICAL + + loggers_set = set() @@ -95,11 +102,11 @@ def set_logging_level(level, name=None): """ Set the current logging level to <level> for logger named <name>. If name is not specified, sets the logging level for all rumba loggers. - Accepted levels are: - DEBUG == 10, - INFO == 20, - WARNING == 30, - ERROR == 40, + Accepted levels are: + DEBUG == 10, + INFO == 20, + WARNING == 30, + ERROR == 40, CRITICAL == 50, NOTSET == 0 (resets the logger: its level is set to the default or its parents' level) diff --git a/rumba/model.py b/rumba/model.py index 72b7baf..5215065 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -20,13 +20,19 @@ # MA 02110-1301 USA import abc -import subprocess +import random +import time import rumba.log as log - +from rumba import ssh_support logger = log.get_logger(__name__) +try: + from numpy.random import poisson +except ImportError: + from rumba.recpoisson import poisson + # Represents generic testbed info # @@ -567,12 +573,13 @@ class Experiment: # @options: Options to pass to the binary # class Client(object): - def __init__(self, ap, options=None): + def __init__(self, ap, testbed, options=None): self.ap = ap self.options = options + self.testbed = testbed - def start_process(self, node, duration, start_time): - return ClientProcess(self.ap, node, duration, start_time, self.options) + def start_process(self, duration): + return ClientProcess(self.ap, duration, self.testbed, self.options) # Base class for client processes @@ -584,25 +591,54 @@ class Client(object): # @options: Options to pass to the binary # class ClientProcess(Client): - def __init__(self, ap, node, duration, start_time, options=None): - super(ClientProcess, self).__init__(ap, options=options) - self.node = node + def __init__(self, ap, duration, testbed, options=None): + super(ClientProcess, self).__init__(ap, testbed, options=options) self.duration = duration - self.start_time = start_time - self.run() - self.running = True + self.start_time = None + self.running = False + self.node = None + self.pid = None - def run(self): - subprocess.Popen([self.ap] + self.options.split()) + def run(self, node): + self.node = node + self.start_time = time.time() + + logger.debug( + 'Starting client app %s on node %s with duration %s.', + self.ap, self.node.name, self.duration + ) + + script = r'nohup "$@" > /dev/null & echo "$!"' + opt_str = self.options if self.options is not None else "" + cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" + % (script,), + "./startup.sh %s %s" % (self.ap, opt_str)] + self.running = True + self.pid = ssh_support.execute_commands(self.testbed, + self.node.ssh_config, + cmds) def stop(self): - pass # TODO to be implemented - - def check(self, now): + logger.debug( + 'Killing client %s on node %s.', + self.ap, self.node.name + ) + ssh_support.execute_command( + self.testbed, + self.node.ssh_config, + "kill %s" % self.pid + ) + + def check(self): + """Check if the process should keep running, stop it if not, + and return true if and only if it is still running.""" + now = time.time() if not self.running: - return + return False if now - self.start_time >= self.duration: self.stop() + return False + return True # Base class for server programs @@ -610,14 +646,15 @@ class ClientProcess(Client): # @ap: Application Process binary # @arrival_rate: Average requests/s to be received by this server # @mean_duration: Average duration of a client connection (in seconds) +# @testbed: the testbed for the experiment # @options: Options to pass to the binary # @max_clients: Maximum number of clients to serve # @clients: Client binaries that will use this server # @nodes: Specific nodes to start this server on # class Server: - def __init__(self, ap, arrival_rate, mean_duration, - options=None, max_clients=None, + def __init__(self, ap, arrival_rate, mean_duration, testbed, + options=None, max_clients=float('inf'), clients=None, nodes=None): self.ap = ap self.options = options @@ -628,6 +665,8 @@ class Server: self.nodes = nodes self.arrival_rate = arrival_rate # mean requests/s self.mean_duration = mean_duration # in seconds + self.testbed = testbed + self.pids = {} def add_client(self, client): self.clients.append(client) @@ -649,13 +688,46 @@ class Server: interval seconds. Hence, the average size should be interval * arrival_rate. """ - pass + # WARNING! using numpy. To be discussed. + number = poisson(self.arrival_rate * interval) + number = int(min(number, self.max_clients)) + l = [self.make_client_process() for _ in range(number)] + return l def make_client_process(self): """Returns a client of this server""" if len(self.clients) == 0: - raise Exception("Server %s has empty client list," % (self,)) - pass # TODO should return a ClientProcess + raise Exception("Server %s has empty client list." % (self,)) + duration = random.expovariate(1.0 / self.mean_duration) + return random.choice(self.clients)\ + .start_process(duration=duration) + + def run(self): + for node in self.nodes: + opt_str = self.options if self.options is not None else "" + script = r'nohup "$@" > /dev/null & echo "$!"' + cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" + % (script,), + "./startup.sh %s %s" % (self.ap, opt_str)] + logger.debug( + 'Starting server %s on node %s.', + self.ap, node.name + ) + self.pids[node] = (ssh_support.execute_commands(self.testbed, + node.ssh_config, + cmds)) + + def stop(self): + for node, pid in self.pids.items(): + logger.debug( + 'Killing server %s on node %s.', + self.ap, node.name + ) + ssh_support.execute_command( + self.testbed, + node.ssh_config, + "kill %s" % pid + ) # Base class for ARCFIRE storyboards @@ -665,12 +737,18 @@ class Server: # @servers: App servers available in the network # class StoryBoard: + + DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) + def __init__(self, experiment, duration, servers=None): self.experiment = experiment self.duration = duration if servers is None: servers = list() self.servers = servers + self.client_nodes = [c for c in experiment.nodes if c.client] + self.active_clients = [] + self.start_time = None def add_server(self, server): self.servers.append(server) @@ -679,4 +757,22 @@ class StoryBoard: self.servers.remove(server) def start(self): - pass + self.start_time = time.time() + try: + for server in self.servers: + server.run() + while time.time() - self.start_time < self.duration: + for server in self.servers: + clients = server.get_new_clients(self.DEFAULT_INTERVAL) + for new_client in clients: + client_node = random.choice(self.client_nodes) + new_client.run(client_node) + self.active_clients.append(new_client) + self.active_clients = \ + [x for x in self.active_clients if x.check()] + time.sleep(self.DEFAULT_INTERVAL) + finally: + for client in self.active_clients: + client.stop() + for server in self.servers: + server.stop() |