diff options
| -rw-r--r-- | rumba/log.py | 17 | ||||
| -rw-r--r-- | rumba/model.py | 142 | 
2 files changed, 131 insertions, 28 deletions
diff --git a/rumba/log.py b/rumba/log.py index d95c034..987f03a 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -24,6 +24,13 @@ import sys  import multiprocessing +DEBUG = logging.DEBUG +INFO = logging.INFO +WARNING = logging.WARNING +ERROR = logging.ERROR +CRITICAL = logging.CRITICAL + +  loggers_set = set() @@ -95,11 +102,11 @@ def set_logging_level(level, name=None):      """      Set the current logging level to <level> for logger named <name>.      If name is not specified, sets the logging level for all rumba loggers. -    Accepted levels are:  -        DEBUG == 10,  -        INFO == 20,  -        WARNING == 30,  -        ERROR == 40,  +    Accepted levels are: +        DEBUG == 10, +        INFO == 20, +        WARNING == 30, +        ERROR == 40,          CRITICAL == 50,          NOTSET == 0      (resets the logger: its level is set to the default or its parents' level) diff --git a/rumba/model.py b/rumba/model.py index 72b7baf..5215065 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -20,13 +20,19 @@  # MA  02110-1301  USA  import abc -import subprocess +import random +import time  import rumba.log as log - +from rumba import ssh_support  logger = log.get_logger(__name__) +try: +    from numpy.random import poisson +except ImportError: +    from rumba.recpoisson import poisson +  # Represents generic testbed info  # @@ -567,12 +573,13 @@ class Experiment:  # @options: Options to pass to the binary  #  class Client(object): -    def __init__(self, ap, options=None): +    def __init__(self, ap, testbed, options=None):          self.ap = ap          self.options = options +        self.testbed = testbed -    def start_process(self, node, duration, start_time): -        return ClientProcess(self.ap, node, duration, start_time, self.options) +    def start_process(self, duration): +        return ClientProcess(self.ap, duration, self.testbed, self.options)  # Base class for client processes @@ -584,25 +591,54 @@ class Client(object):  # @options: Options to pass to the binary  #  class ClientProcess(Client): -    def __init__(self, ap, node, duration, start_time, options=None): -        super(ClientProcess, self).__init__(ap, options=options) -        self.node = node +    def __init__(self, ap, duration, testbed, options=None): +        super(ClientProcess, self).__init__(ap, testbed, options=options)          self.duration = duration -        self.start_time = start_time -        self.run() -        self.running = True +        self.start_time = None +        self.running = False +        self.node = None +        self.pid = None -    def run(self): -        subprocess.Popen([self.ap] + self.options.split()) +    def run(self, node): +        self.node = node +        self.start_time = time.time() + +        logger.debug( +            'Starting client app %s on node %s with duration %s.', +            self.ap, self.node.name, self.duration +        ) + +        script = r'nohup "$@" > /dev/null & echo "$!"' +        opt_str = self.options if self.options is not None else "" +        cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" +                % (script,), +                "./startup.sh %s %s" % (self.ap, opt_str)] +        self.running = True +        self.pid = ssh_support.execute_commands(self.testbed, +                                                self.node.ssh_config, +                                                cmds)      def stop(self): -        pass  # TODO to be implemented - -    def check(self, now): +        logger.debug( +            'Killing client %s on node %s.', +            self.ap, self.node.name +        ) +        ssh_support.execute_command( +            self.testbed, +            self.node.ssh_config, +            "kill %s" % self.pid +        ) + +    def check(self): +        """Check if the process should keep running, stop it if not, +        and return true if and only if it is still running.""" +        now = time.time()          if not self.running: -            return +            return False          if now - self.start_time >= self.duration:              self.stop() +            return False +        return True  # Base class for server programs @@ -610,14 +646,15 @@ class ClientProcess(Client):  # @ap: Application Process binary  # @arrival_rate: Average requests/s to be received by this server  # @mean_duration: Average duration of a client connection (in seconds) +# @testbed: the testbed for the experiment  # @options: Options to pass to the binary  # @max_clients: Maximum number of clients to serve  # @clients: Client binaries that will use this server  # @nodes: Specific nodes to start this server on  #  class Server: -    def __init__(self, ap, arrival_rate, mean_duration, -                 options=None, max_clients=None, +    def __init__(self, ap, arrival_rate, mean_duration, testbed, +                 options=None, max_clients=float('inf'),                   clients=None, nodes=None):          self.ap = ap          self.options = options @@ -628,6 +665,8 @@ class Server:          self.nodes = nodes          self.arrival_rate = arrival_rate  # mean requests/s          self.mean_duration = mean_duration  # in seconds +        self.testbed = testbed +        self.pids = {}      def add_client(self, client):          self.clients.append(client) @@ -649,13 +688,46 @@ class Server:          interval seconds.          Hence, the average size should be interval * arrival_rate.          """ -        pass +        # WARNING! using numpy. To be discussed. +        number = poisson(self.arrival_rate * interval) +        number = int(min(number, self.max_clients)) +        l = [self.make_client_process() for _ in range(number)] +        return l      def make_client_process(self):          """Returns a client of this server"""          if len(self.clients) == 0: -            raise Exception("Server %s has empty client list," % (self,)) -        pass  # TODO should return a ClientProcess +            raise Exception("Server %s has empty client list." % (self,)) +        duration = random.expovariate(1.0 / self.mean_duration) +        return random.choice(self.clients)\ +            .start_process(duration=duration) + +    def run(self): +        for node in self.nodes: +            opt_str = self.options if self.options is not None else "" +            script = r'nohup "$@" > /dev/null & echo "$!"' +            cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" +                    % (script,), +                    "./startup.sh %s %s" % (self.ap, opt_str)] +            logger.debug( +                'Starting server %s on node %s.', +                self.ap, node.name +            ) +            self.pids[node] = (ssh_support.execute_commands(self.testbed, +                                                            node.ssh_config, +                                                            cmds)) + +    def stop(self): +        for node, pid in self.pids.items(): +            logger.debug( +                'Killing server %s on node %s.', +                self.ap, node.name +            ) +            ssh_support.execute_command( +                self.testbed, +                node.ssh_config, +                "kill %s" % pid +            )  # Base class for ARCFIRE storyboards @@ -665,12 +737,18 @@ class Server:  # @servers: App servers available in the network  #  class StoryBoard: + +    DEFAULT_INTERVAL = 2.5  # in seconds (may be a float) +      def __init__(self, experiment, duration, servers=None):          self.experiment = experiment          self.duration = duration          if servers is None:              servers = list()          self.servers = servers +        self.client_nodes = [c for c in experiment.nodes if c.client] +        self.active_clients = [] +        self.start_time = None      def add_server(self, server):          self.servers.append(server) @@ -679,4 +757,22 @@ class StoryBoard:          self.servers.remove(server)      def start(self): -        pass +        self.start_time = time.time() +        try: +            for server in self.servers: +                server.run() +            while time.time() - self.start_time < self.duration: +                for server in self.servers: +                    clients = server.get_new_clients(self.DEFAULT_INTERVAL) +                    for new_client in clients: +                        client_node = random.choice(self.client_nodes) +                        new_client.run(client_node) +                        self.active_clients.append(new_client) +                self.active_clients = \ +                    [x for x in self.active_clients if x.check()] +                time.sleep(self.DEFAULT_INTERVAL) +        finally: +            for client in self.active_clients: +                client.stop() +            for server in self.servers: +                server.stop()  | 
