From 26ed0d7231ce681e6f2041760ba69406ffb6ee86 Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Wed, 21 Jun 2017 17:08:16 +0200 Subject: Storyboard implemented, to be tested --- rumba/model.py | 142 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 119 insertions(+), 23 deletions(-) (limited to 'rumba/model.py') diff --git a/rumba/model.py b/rumba/model.py index 72b7baf..5215065 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -20,13 +20,19 @@ # MA 02110-1301 USA import abc -import subprocess +import random +import time import rumba.log as log - +from rumba import ssh_support logger = log.get_logger(__name__) +try: + from numpy.random import poisson +except ImportError: + from rumba.recpoisson import poisson + # Represents generic testbed info # @@ -567,12 +573,13 @@ class Experiment: # @options: Options to pass to the binary # class Client(object): - def __init__(self, ap, options=None): + def __init__(self, ap, testbed, options=None): self.ap = ap self.options = options + self.testbed = testbed - def start_process(self, node, duration, start_time): - return ClientProcess(self.ap, node, duration, start_time, self.options) + def start_process(self, duration): + return ClientProcess(self.ap, duration, self.testbed, self.options) # Base class for client processes @@ -584,25 +591,54 @@ class Client(object): # @options: Options to pass to the binary # class ClientProcess(Client): - def __init__(self, ap, node, duration, start_time, options=None): - super(ClientProcess, self).__init__(ap, options=options) - self.node = node + def __init__(self, ap, duration, testbed, options=None): + super(ClientProcess, self).__init__(ap, testbed, options=options) self.duration = duration - self.start_time = start_time - self.run() - self.running = True + self.start_time = None + self.running = False + self.node = None + self.pid = None - def run(self): - subprocess.Popen([self.ap] + self.options.split()) + def run(self, node): + self.node = node + self.start_time = time.time() + + logger.debug( + 'Starting client app %s on node %s with duration %s.', + self.ap, self.node.name, self.duration + ) + + script = r'nohup "$@" > /dev/null & echo "$!"' + opt_str = self.options if self.options is not None else "" + cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" + % (script,), + "./startup.sh %s %s" % (self.ap, opt_str)] + self.running = True + self.pid = ssh_support.execute_commands(self.testbed, + self.node.ssh_config, + cmds) def stop(self): - pass # TODO to be implemented - - def check(self, now): + logger.debug( + 'Killing client %s on node %s.', + self.ap, self.node.name + ) + ssh_support.execute_command( + self.testbed, + self.node.ssh_config, + "kill %s" % self.pid + ) + + def check(self): + """Check if the process should keep running, stop it if not, + and return true if and only if it is still running.""" + now = time.time() if not self.running: - return + return False if now - self.start_time >= self.duration: self.stop() + return False + return True # Base class for server programs @@ -610,14 +646,15 @@ class ClientProcess(Client): # @ap: Application Process binary # @arrival_rate: Average requests/s to be received by this server # @mean_duration: Average duration of a client connection (in seconds) +# @testbed: the testbed for the experiment # @options: Options to pass to the binary # @max_clients: Maximum number of clients to serve # @clients: Client binaries that will use this server # @nodes: Specific nodes to start this server on # class Server: - def __init__(self, ap, arrival_rate, mean_duration, - options=None, max_clients=None, + def __init__(self, ap, arrival_rate, mean_duration, testbed, + options=None, max_clients=float('inf'), clients=None, nodes=None): self.ap = ap self.options = options @@ -628,6 +665,8 @@ class Server: self.nodes = nodes self.arrival_rate = arrival_rate # mean requests/s self.mean_duration = mean_duration # in seconds + self.testbed = testbed + self.pids = {} def add_client(self, client): self.clients.append(client) @@ -649,13 +688,46 @@ class Server: interval seconds. Hence, the average size should be interval * arrival_rate. """ - pass + # WARNING! using numpy. To be discussed. + number = poisson(self.arrival_rate * interval) + number = int(min(number, self.max_clients)) + l = [self.make_client_process() for _ in range(number)] + return l def make_client_process(self): """Returns a client of this server""" if len(self.clients) == 0: - raise Exception("Server %s has empty client list," % (self,)) - pass # TODO should return a ClientProcess + raise Exception("Server %s has empty client list." % (self,)) + duration = random.expovariate(1.0 / self.mean_duration) + return random.choice(self.clients)\ + .start_process(duration=duration) + + def run(self): + for node in self.nodes: + opt_str = self.options if self.options is not None else "" + script = r'nohup "$@" > /dev/null & echo "$!"' + cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" + % (script,), + "./startup.sh %s %s" % (self.ap, opt_str)] + logger.debug( + 'Starting server %s on node %s.', + self.ap, node.name + ) + self.pids[node] = (ssh_support.execute_commands(self.testbed, + node.ssh_config, + cmds)) + + def stop(self): + for node, pid in self.pids.items(): + logger.debug( + 'Killing server %s on node %s.', + self.ap, node.name + ) + ssh_support.execute_command( + self.testbed, + node.ssh_config, + "kill %s" % pid + ) # Base class for ARCFIRE storyboards @@ -665,12 +737,18 @@ class Server: # @servers: App servers available in the network # class StoryBoard: + + DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) + def __init__(self, experiment, duration, servers=None): self.experiment = experiment self.duration = duration if servers is None: servers = list() self.servers = servers + self.client_nodes = [c for c in experiment.nodes if c.client] + self.active_clients = [] + self.start_time = None def add_server(self, server): self.servers.append(server) @@ -679,4 +757,22 @@ class StoryBoard: self.servers.remove(server) def start(self): - pass + self.start_time = time.time() + try: + for server in self.servers: + server.run() + while time.time() - self.start_time < self.duration: + for server in self.servers: + clients = server.get_new_clients(self.DEFAULT_INTERVAL) + for new_client in clients: + client_node = random.choice(self.client_nodes) + new_client.run(client_node) + self.active_clients.append(new_client) + self.active_clients = \ + [x for x in self.active_clients if x.check()] + time.sleep(self.DEFAULT_INTERVAL) + finally: + for client in self.active_clients: + client.stop() + for server in self.servers: + server.stop() -- cgit v1.2.3