From 134eae6b986709a6524afa1e1f18527cbe900eea Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Mon, 23 Oct 2017 14:14:17 +0200 Subject: storyboard: decouple from experiment + Node no longer has "client" attribute + Client has a "nodes" attribute instead + servers, server nodes and the experiment can be added to a storyboard after instantiation to allow reuse of a SB + moved storyboard machinery to a separate module rumba.storyboard --- examples/example.py | 8 +- rumba/model.py | 273 +--------------------------------------------- rumba/storyboard.py | 306 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 311 insertions(+), 276 deletions(-) create mode 100644 rumba/storyboard.py diff --git a/examples/example.py b/examples/example.py index dd18a07..8d17ff2 100755 --- a/examples/example.py +++ b/examples/example.py @@ -4,6 +4,7 @@ from rumba.model import * from rumba.utils import ExperimentManager +from rumba.storyboard import * # import testbed plugins import rumba.testbeds.emulab as emulab @@ -34,8 +35,7 @@ a = Node("a", b = Node("b", difs = [e1, n1], - dif_registrations = {n1 : [e1]}, - client = True) + dif_registrations = {n1 : [e1]}) tb = jfed.Testbed(exp_name = "example1", username = "user1", @@ -48,8 +48,8 @@ print(exp) with ExperimentManager(exp): exp.swap_in() exp.bootstrap_prototype() - c1 = Client("rinaperf", options ="-t perf -i 1000 -s 1000 -c 0") + c1 = Client("rinaperf", options ="-t perf -s 1000 -c 0", nodes=[b]) s1 = Server("rinaperf", arrival_rate=2, mean_duration=5, options = "-l", nodes = [a], clients = [c1]) - sb = StoryBoard(exp, duration=20, servers = [s1]) + sb = StoryBoard(duration=3600, experiment=exp, servers=[s1]) sb.start() diff --git a/rumba/model.py b/rumba/model.py index da63e76..7a68459 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -26,28 +26,13 @@ import abc import os -import random import stat -import time import rumba.log as log from rumba import ssh_support -from rumba.ssh_support import SSHException logger = log.get_logger(__name__) -try: - from numpy.random import poisson - from numpy.random import exponential - logger.debug("Using numpy for faster and better random variables.") -except ImportError: - from rumba.recpoisson import poisson - - def exponential(mean_duration): - return random.expovariate(1.0 / mean_duration) - - logger.debug("Falling back to simple implementations.") - # PROBLEM! These logs will almost never be printed... But we might not care tmp_dir = '/tmp/rumba' try: @@ -222,7 +207,7 @@ class SSHConfig: # class Node: def __init__(self, name, difs=None, dif_registrations=None, - client=False, policies=None, machine_type=None): + policies=None, machine_type=None): self.name = name if difs is None: difs = list() @@ -241,7 +226,6 @@ class Node: for dif in self.difs: if hasattr(dif, 'policy'): # check if the dif supports policies self.policies[dif] = policies.get(dif, Policy(dif, self)) - self.client = client self._validate() @@ -839,258 +823,3 @@ class Experiment: node.ssh_config.proxy_client.close() # Undo the testbed (testbed-specific) self.testbed.swap_out(self) - - -# Base class for client programs -# -# @ap: Application Process binary -# @options: Options to pass to the binary -# -class Client(object): - def __init__(self, ap, options=None): - self.ap = ap - self.options = options - - def start_process(self, duration): - return ClientProcess(self.ap, duration, self.options) - - -# Base class for client processes -# -# @ap: Application Process binary -# @duration: The time (in seconds) this process should run -# @start_time: The time at which this process is started. -# @options: Options to pass to the binary -# -class ClientProcess(Client): - def __init__(self, ap, duration, options=None): - super(ClientProcess, self).__init__(ap, options=options) - self.duration = duration - self.start_time = None - self.running = False - self.node = None - self.pid = None - - def run(self, node): - self.node = node - self.start_time = time.time() - - logger.debug( - 'Starting client app %s on node %s with duration %s.', - self.ap, self.node.name, self.duration - ) - - opt_str = self.options if self.options is not None else "" - cmd = "./startup.sh %s %s" % (self.ap, opt_str) - self.running = True - try: - self.pid = self.node.execute_command(cmd) - except SSHException: - logger.warning('Could not start client %s on node %s.', - self.ap, node.name) - logger.debug('Client app %s on node %s got pid %s.', - self.ap, self.node.name, self.pid) - - def stop(self): - logger.debug( - 'Killing client %s on node %s.', - self.ap, self.node.name - ) - try: - self.node.execute_command("kill %s" % self.pid) - except SSHException: - logger.warning('Could not kill client %s on node %s.', - self.ap, self.node.name) - - def kill_check(self): - """Check if the process should keep running, stop it if not, - and return true if and only if it is still running.""" - now = time.time() - if not self.running: - return False - if now - self.start_time >= self.duration: - self.stop() - self.running = False - return False - return True - - -# Base class for server programs -# -# @ap: Application Process binary -# @arrival_rate: Average requests/s to be received by this server -# @mean_duration: Average duration of a client connection (in seconds) -# @options: Options to pass to the binary -# @max_clients: Maximum number of clients to serve -# @clients: Client binaries that will use this server -# @nodes: Specific nodes to start this server on -# -class Server: - def __init__(self, ap, arrival_rate, mean_duration, - options=None, max_clients=float('inf'), - clients=None, nodes=None): - self.ap = ap - self.options = options - self.max_clients = max_clients - if clients is None: - clients = list() - self.clients = clients - self.nodes = nodes - self.arrival_rate = arrival_rate # mean requests/s - self.mean_duration = mean_duration # in seconds - self.pids = {} - - def add_client(self, client): - self.clients.append(client) - - def del_client(self, client): - self.clients.remove(client) - - def add_node(self, node): - self.nodes.append(node) - - def del_node(self, node): - self.nodes.remove(node) - - def get_new_clients(self, interval): - """ - Returns a list of clients of size appropriate to the server's rate. - - The list's size should be a sample from Poisson(arrival_rate) over - interval seconds. - Hence, the average size should be interval * arrival_rate. - """ - number = poisson(self.arrival_rate * interval) - number = int(min(number, self.max_clients)) - return [self.make_client_process() for _ in range(number)] - - def make_client_process(self): - """Returns a client of this server""" - if len(self.clients) == 0: - raise Exception("Server %s has empty client list." % (self,)) - duration = exponential(self.mean_duration) - return random.choice(self.clients)\ - .start_process(duration=duration) - - def run(self): - for node in self.nodes: - opt_str = self.options if self.options is not None else "" - logfile = "%s.log" % self.ap - script = r'nohup "$@" > %s & echo "$!"' % logfile - cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" - % (script,), - "./startup.sh %s %s" % (self.ap, opt_str)] - logger.debug( - 'Starting server %s on node %s with logfile %s.', - self.ap, node.name, logfile - ) - try: - self.pids[node] = (node.execute_commands(cmds)) - except SSHException: - logger.warning('Could not start server %s on node %s.', - self.ap, node.name) - - def stop(self): - for node, pid in self.pids.items(): - logger.debug( - 'Killing server %s on node %s.', - self.ap, node.name - ) - try: - node.execute_command("kill %s" % pid) - except SSHException: - logger.warning('Could not kill server %s on node %s.', - self.ap, node.name) - - -# Base class for ARCFIRE storyboards -# -# @experiment: Experiment to use as input -# @duration: Duration of the whole storyboard -# @servers: App servers available in the network -# -class StoryBoard: - - DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) - - def __init__(self, experiment, duration, servers=None): - self.experiment = experiment - self.duration = duration - if servers is None: - servers = list() - self.servers = servers - self.client_nodes = [c for c in experiment.nodes if c.client] - self.active_clients = [] - self.start_time = None - self.commands_list = {} - - def add_server(self, server): - self.servers.append(server) - - def del_server(self, server): - self.servers.remove(server) - - def run_command(self, t, node, command): - """ - Schedule the given command to be run at t seconds from the start. - The commands are run in no particular order, so take care - - :param t: (float) seconds to wait before running the command - :param node: (Node) the node on which the command should be run - :param command: (str or list[str]) the command(s) to be run, - """ - if isinstance(command, str): - self.commands_list.setdefault(t, []).append((node, command)) - else: # Hope it's an Iterable[str]. Otherwise, errors will happen. - for cmd in command: - self.commands_list.setdefault(t, []).append((node, cmd)) - - def periodic_check(self): - # Spawn new clients - for server in self.servers: - clients = server.get_new_clients(self.DEFAULT_INTERVAL) - for new_client in clients: - client_node = random.choice(self.client_nodes) - new_client.run(client_node) - self.active_clients.append(new_client) - surviving = [] - - # Kill expired clients - for x in self.active_clients: - if x.kill_check(): # - surviving.append(x) - self.active_clients = surviving - - # Do run_command instructions - unexpired_commands = {} - for t in self.commands_list: - if time.time() - self.start_time > t: - for node, command in self.commands_list[t]: - node.execute_command(command) - else: - unexpired_commands[t] = self.commands_list[t] - self.commands_list = unexpired_commands - - def start(self): - logger.info('Starting storyboard execution') - self.start_time = time.time() - script = r'nohup "$@" > /dev/null & echo "$!"' - for node in self.client_nodes: - logger.debug("Writing utility startup script on client nodes.") - node.execute_command( - "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) - ) - try: - for server in self.servers: - server.run() - while time.time() - self.start_time < self.duration: - self.periodic_check() - time.sleep(self.DEFAULT_INTERVAL) - self.periodic_check() # Do things that were scheduled - # in the last "INTERVAL" seconds - # of the StoryBoard - finally: - for client in self.active_clients: - client.stop() - for server in self.servers: - server.stop() diff --git a/rumba/storyboard.py b/rumba/storyboard.py new file mode 100644 index 0000000..8c73422 --- /dev/null +++ b/rumba/storyboard.py @@ -0,0 +1,306 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders +# Vincenzo Maffione +# Marco Capitani +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +# Base class for client programs +# +# @ap: Application Process binary +# @options: Options to pass to the binary +# +import random +import time + +import rumba.model as model +import rumba.ssh_support as ssh_support +import rumba.log as log + +logger = log.get_logger(__name__) + +try: + from numpy.random import poisson + from numpy.random import exponential + logger.debug("Using numpy for faster and better random variables.") +except ImportError: + from rumba.recpoisson import poisson + + def exponential(mean_duration): + return random.expovariate(1.0 / mean_duration) + + logger.debug("Falling back to simple implementations.") + # PROBLEM! These logs will almost never be printed... But we might not care + + +class Client(object): + def __init__(self, ap, nodes=None, options=None): + self.ap = ap + self.options = options + if isinstance(nodes, model.Node): + nodes = [nodes] + if nodes is None: + nodes = [] + self.nodes = nodes + + def add_node(self, node): + if not isinstance(node, model.Node): + raise Exception("A Node is required.") + self.nodes.append(node) + + def process(self, duration): + node = random.choice(self.nodes) if len(self.nodes) > 0 else None + return ClientProcess(self.ap, duration, node, self.options) + + +# Base class for client processes +# +# @ap: Application Process binary +# @duration: The time (in seconds) this process should run +# @start_time: The time at which this process is started. +# @options: Options to pass to the binary +# +class ClientProcess(Client): + def __init__(self, ap, duration, node=None, options=None): + super(ClientProcess, self).__init__(ap, node, options=options) + self.duration = duration + self.start_time = None + self.running = False + self.node = node + self.pid = None + + def run(self, node=None): + self.node = node + if self.node is None: + raise Exception('No node specified for client %s' % (self.ap,)) + self.start_time = time.time() + + logger.debug( + 'Starting client app %s on node %s with duration %s.', + self.ap, self.node.name, self.duration + ) + + opt_str = self.options if self.options is not None else "" + cmd = "./startup.sh %s %s" % (self.ap, opt_str) + self.running = True + try: + self.pid = self.node.execute_command(cmd) + except ssh_support.SSHException: + logger.warning('Could not start client %s on node %s.', + self.ap, self.node.name) + logger.debug('Client app %s on node %s got pid %s.', + self.ap, self.node.name, self.pid) + + def stop(self): + logger.debug( + 'Killing client %s on node %s.', + self.ap, self.node.name + ) + try: + self.node.execute_command("kill %s" % self.pid) + except ssh_support.SSHException: + logger.warn('Could not kill client %s on node %s.', + self.ap, self.node.name) + + def check(self): + """Check if the process should keep running, stop it if not, + and return true if and only if it is still running.""" + now = time.time() + if not self.running: + return False + if now - self.start_time >= self.duration: + self.stop() + self.running = False + return False + return True + + +# Base class for server programs +# +# @ap: Application Process binary +# @arrival_rate: Average requests/s to be received by this server +# @mean_duration: Average duration of a client connection (in seconds) +# @options: Options to pass to the binary +# @max_clients: Maximum number of clients to serve +# @clients: Client binaries that will use this server +# @nodes: Specific nodes to start this server on +# +class Server: + def __init__(self, ap, arrival_rate, mean_duration, + options=None, max_clients=float('inf'), + clients=None, nodes=None): + self.ap = ap + self.options = options if options is not None else "" + self.max_clients = max_clients + if clients is None: + clients = list() + self.clients = clients + self.nodes = nodes + self.arrival_rate = arrival_rate # mean requests/s + self.mean_duration = mean_duration # in seconds + self.pids = {} + + def add_client(self, client): + self.clients.append(client) + + def del_client(self, client): + self.clients.remove(client) + + def add_node(self, node): + self.nodes.append(node) + + def del_node(self, node): + self.nodes.remove(node) + + def get_new_clients(self, interval): + """ + Returns a list of clients of size appropriate to the server's rate. + + The list's size should be a sample from Poisson(arrival_rate) over + interval seconds. + Hence, the average size should be interval * arrival_rate. + """ + number = poisson(self.arrival_rate * interval) + number = int(min(number, self.max_clients)) + return [self.make_client_process() for _ in range(number)] + + def make_client_process(self): + """Returns a client of this server""" + if len(self.clients) == 0: + raise Exception("Server %s has empty client list." % (self,)) + duration = exponential(self.mean_duration) + return random.choice(self.clients).process(duration=duration) + + def run(self): + for node in self.nodes: + opt_str = self.options + logfile = "%s_server.log" % self.ap + script = r'nohup "$@" > %s & echo "$!"' % (logfile,) + cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" + % (script,), + "./startup.sh %s %s" % (self.ap, opt_str)] + logger.debug( + 'Starting server %s on node %s with logfile %s.', + self.ap, node.name, logfile + ) + try: + self.pids[node] = (node.execute_commands(cmds)) + except ssh_support.SSHException: + logger.warn('Could not start server %s on node %s.', + self.ap, node.name) + + def stop(self): + for node, pid in self.pids.items(): + logger.debug( + 'Killing server %s on node %s.', + self.ap, node.name + ) + try: + node.execute_command("kill %s" % pid) + except ssh_support.SSHException: + logger.warn('Could not kill server %s on node %s.', + self.ap, node.name) + + +# Base class for ARCFIRE storyboards +# +# @experiment: Experiment to use as input +# @duration: Duration of the whole storyboard +# @servers: App servers available in the network. +# Type == Server or Type == List[Tuple[Server, Node]] +# +class StoryBoard: + + DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) + + def __init__(self, duration, experiment=None, servers=None): + self.experiment = experiment + self.duration = duration + self.servers = list() + if servers is None: + servers = list() + for s in servers: + self._validate_and_add_server(s) + self.active_clients = [] + self.start_time = None + + def _validate_and_add_server(self, s): + if self.experiment is None: + raise ValueError("Cannot add a server before " + "setting the experiment.") + if hasattr(s, '__len__') and len(s) == 2: + server, node = s + if not isinstance(server, Server) or not isinstance(node, model.Node): + raise TypeError('First element must be of "Server" type, ' + 'second must be of "Node" type.') + server.add_node(node) + self.servers.append(server) + elif type(s) == Server: + self.servers.append(s) + else: + raise TypeError('Input servers should be either an object of ' + '"Server" type or a Server-Node couple.') + for node in self.servers[-1].nodes: + if node not in self.experiment.nodes: + raise ValueError('Cannot run server on node %s, ' + 'not in experiment.' % (node.name,)) + + def set_experiment(self, experiment): + if not isinstance(experiment, model.Experiment): + raise TypeError('Experiment instance required.') + self.experiment = experiment + + def add_server(self, server): + self._validate_and_add_server(server) + + def del_server(self, server): + self.servers.remove(server) + + def start(self): + self.start_time = time.time() + script = r'nohup "$@" > /tmp/ & echo "$!"' + logger.debug("Writing utility startup script on client nodes.") + for server in self.servers: + for client in server.clients: + for node in client.nodes: + node.execute_command( + "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) + ) + try: + for server in self.servers: + server.run() + while time.time() - self.start_time < self.duration: + for server in self.servers: + clients = server.get_new_clients(self.DEFAULT_INTERVAL) + for new_client in clients: + new_client.run() + self.active_clients.append(new_client) + surviving = [] + for x in self.active_clients: + if x.check(): + surviving.append(x) + self.active_clients = surviving + time.sleep(self.DEFAULT_INTERVAL) + finally: + for client in self.active_clients: + client.stop() + for server in self.servers: + server.stop() -- cgit v1.2.3