aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-10-23 14:14:17 +0200
committerSander Vrijders <sander.vrijders@ugent.be>2017-11-17 15:17:30 +0000
commit134eae6b986709a6524afa1e1f18527cbe900eea (patch)
tree642b300eb0f6e785ec52334995a517f95f7f784d
parentc884a5b8c1e2a2f4d610cae7b9aa547b95424210 (diff)
downloadrumba-134eae6b986709a6524afa1e1f18527cbe900eea.tar.gz
rumba-134eae6b986709a6524afa1e1f18527cbe900eea.zip
storyboard: decouple from experiment
+ Node no longer has "client" attribute + Client has a "nodes" attribute instead + servers, server nodes and the experiment can be added to a storyboard after instantiation to allow reuse of a SB + moved storyboard machinery to a separate module rumba.storyboard
-rwxr-xr-xexamples/example.py8
-rw-r--r--rumba/model.py273
-rw-r--r--rumba/storyboard.py306
3 files changed, 311 insertions, 276 deletions
diff --git a/examples/example.py b/examples/example.py
index dd18a07..8d17ff2 100755
--- a/examples/example.py
+++ b/examples/example.py
@@ -4,6 +4,7 @@
from rumba.model import *
from rumba.utils import ExperimentManager
+from rumba.storyboard import *
# import testbed plugins
import rumba.testbeds.emulab as emulab
@@ -34,8 +35,7 @@ a = Node("a",
b = Node("b",
difs = [e1, n1],
- dif_registrations = {n1 : [e1]},
- client = True)
+ dif_registrations = {n1 : [e1]})
tb = jfed.Testbed(exp_name = "example1",
username = "user1",
@@ -48,8 +48,8 @@ print(exp)
with ExperimentManager(exp):
exp.swap_in()
exp.bootstrap_prototype()
- c1 = Client("rinaperf", options ="-t perf -i 1000 -s 1000 -c 0")
+ c1 = Client("rinaperf", options ="-t perf -s 1000 -c 0", nodes=[b])
s1 = Server("rinaperf", arrival_rate=2, mean_duration=5,
options = "-l", nodes = [a], clients = [c1])
- sb = StoryBoard(exp, duration=20, servers = [s1])
+ sb = StoryBoard(duration=3600, experiment=exp, servers=[s1])
sb.start()
diff --git a/rumba/model.py b/rumba/model.py
index da63e76..7a68459 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -26,28 +26,13 @@
import abc
import os
-import random
import stat
-import time
import rumba.log as log
from rumba import ssh_support
-from rumba.ssh_support import SSHException
logger = log.get_logger(__name__)
-try:
- from numpy.random import poisson
- from numpy.random import exponential
- logger.debug("Using numpy for faster and better random variables.")
-except ImportError:
- from rumba.recpoisson import poisson
-
- def exponential(mean_duration):
- return random.expovariate(1.0 / mean_duration)
-
- logger.debug("Falling back to simple implementations.")
- # PROBLEM! These logs will almost never be printed... But we might not care
tmp_dir = '/tmp/rumba'
try:
@@ -222,7 +207,7 @@ class SSHConfig:
#
class Node:
def __init__(self, name, difs=None, dif_registrations=None,
- client=False, policies=None, machine_type=None):
+ policies=None, machine_type=None):
self.name = name
if difs is None:
difs = list()
@@ -241,7 +226,6 @@ class Node:
for dif in self.difs:
if hasattr(dif, 'policy'): # check if the dif supports policies
self.policies[dif] = policies.get(dif, Policy(dif, self))
- self.client = client
self._validate()
@@ -839,258 +823,3 @@ class Experiment:
node.ssh_config.proxy_client.close()
# Undo the testbed (testbed-specific)
self.testbed.swap_out(self)
-
-
-# Base class for client programs
-#
-# @ap: Application Process binary
-# @options: Options to pass to the binary
-#
-class Client(object):
- def __init__(self, ap, options=None):
- self.ap = ap
- self.options = options
-
- def start_process(self, duration):
- return ClientProcess(self.ap, duration, self.options)
-
-
-# Base class for client processes
-#
-# @ap: Application Process binary
-# @duration: The time (in seconds) this process should run
-# @start_time: The time at which this process is started.
-# @options: Options to pass to the binary
-#
-class ClientProcess(Client):
- def __init__(self, ap, duration, options=None):
- super(ClientProcess, self).__init__(ap, options=options)
- self.duration = duration
- self.start_time = None
- self.running = False
- self.node = None
- self.pid = None
-
- def run(self, node):
- self.node = node
- self.start_time = time.time()
-
- logger.debug(
- 'Starting client app %s on node %s with duration %s.',
- self.ap, self.node.name, self.duration
- )
-
- opt_str = self.options if self.options is not None else ""
- cmd = "./startup.sh %s %s" % (self.ap, opt_str)
- self.running = True
- try:
- self.pid = self.node.execute_command(cmd)
- except SSHException:
- logger.warning('Could not start client %s on node %s.',
- self.ap, node.name)
- logger.debug('Client app %s on node %s got pid %s.',
- self.ap, self.node.name, self.pid)
-
- def stop(self):
- logger.debug(
- 'Killing client %s on node %s.',
- self.ap, self.node.name
- )
- try:
- self.node.execute_command("kill %s" % self.pid)
- except SSHException:
- logger.warning('Could not kill client %s on node %s.',
- self.ap, self.node.name)
-
- def kill_check(self):
- """Check if the process should keep running, stop it if not,
- and return true if and only if it is still running."""
- now = time.time()
- if not self.running:
- return False
- if now - self.start_time >= self.duration:
- self.stop()
- self.running = False
- return False
- return True
-
-
-# Base class for server programs
-#
-# @ap: Application Process binary
-# @arrival_rate: Average requests/s to be received by this server
-# @mean_duration: Average duration of a client connection (in seconds)
-# @options: Options to pass to the binary
-# @max_clients: Maximum number of clients to serve
-# @clients: Client binaries that will use this server
-# @nodes: Specific nodes to start this server on
-#
-class Server:
- def __init__(self, ap, arrival_rate, mean_duration,
- options=None, max_clients=float('inf'),
- clients=None, nodes=None):
- self.ap = ap
- self.options = options
- self.max_clients = max_clients
- if clients is None:
- clients = list()
- self.clients = clients
- self.nodes = nodes
- self.arrival_rate = arrival_rate # mean requests/s
- self.mean_duration = mean_duration # in seconds
- self.pids = {}
-
- def add_client(self, client):
- self.clients.append(client)
-
- def del_client(self, client):
- self.clients.remove(client)
-
- def add_node(self, node):
- self.nodes.append(node)
-
- def del_node(self, node):
- self.nodes.remove(node)
-
- def get_new_clients(self, interval):
- """
- Returns a list of clients of size appropriate to the server's rate.
-
- The list's size should be a sample from Poisson(arrival_rate) over
- interval seconds.
- Hence, the average size should be interval * arrival_rate.
- """
- number = poisson(self.arrival_rate * interval)
- number = int(min(number, self.max_clients))
- return [self.make_client_process() for _ in range(number)]
-
- def make_client_process(self):
- """Returns a client of this server"""
- if len(self.clients) == 0:
- raise Exception("Server %s has empty client list." % (self,))
- duration = exponential(self.mean_duration)
- return random.choice(self.clients)\
- .start_process(duration=duration)
-
- def run(self):
- for node in self.nodes:
- opt_str = self.options if self.options is not None else ""
- logfile = "%s.log" % self.ap
- script = r'nohup "$@" > %s & echo "$!"' % logfile
- cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh"
- % (script,),
- "./startup.sh %s %s" % (self.ap, opt_str)]
- logger.debug(
- 'Starting server %s on node %s with logfile %s.',
- self.ap, node.name, logfile
- )
- try:
- self.pids[node] = (node.execute_commands(cmds))
- except SSHException:
- logger.warning('Could not start server %s on node %s.',
- self.ap, node.name)
-
- def stop(self):
- for node, pid in self.pids.items():
- logger.debug(
- 'Killing server %s on node %s.',
- self.ap, node.name
- )
- try:
- node.execute_command("kill %s" % pid)
- except SSHException:
- logger.warning('Could not kill server %s on node %s.',
- self.ap, node.name)
-
-
-# Base class for ARCFIRE storyboards
-#
-# @experiment: Experiment to use as input
-# @duration: Duration of the whole storyboard
-# @servers: App servers available in the network
-#
-class StoryBoard:
-
- DEFAULT_INTERVAL = 2.5 # in seconds (may be a float)
-
- def __init__(self, experiment, duration, servers=None):
- self.experiment = experiment
- self.duration = duration
- if servers is None:
- servers = list()
- self.servers = servers
- self.client_nodes = [c for c in experiment.nodes if c.client]
- self.active_clients = []
- self.start_time = None
- self.commands_list = {}
-
- def add_server(self, server):
- self.servers.append(server)
-
- def del_server(self, server):
- self.servers.remove(server)
-
- def run_command(self, t, node, command):
- """
- Schedule the given command to be run at t seconds from the start.
- The commands are run in no particular order, so take care
-
- :param t: (float) seconds to wait before running the command
- :param node: (Node) the node on which the command should be run
- :param command: (str or list[str]) the command(s) to be run,
- """
- if isinstance(command, str):
- self.commands_list.setdefault(t, []).append((node, command))
- else: # Hope it's an Iterable[str]. Otherwise, errors will happen.
- for cmd in command:
- self.commands_list.setdefault(t, []).append((node, cmd))
-
- def periodic_check(self):
- # Spawn new clients
- for server in self.servers:
- clients = server.get_new_clients(self.DEFAULT_INTERVAL)
- for new_client in clients:
- client_node = random.choice(self.client_nodes)
- new_client.run(client_node)
- self.active_clients.append(new_client)
- surviving = []
-
- # Kill expired clients
- for x in self.active_clients:
- if x.kill_check(): #
- surviving.append(x)
- self.active_clients = surviving
-
- # Do run_command instructions
- unexpired_commands = {}
- for t in self.commands_list:
- if time.time() - self.start_time > t:
- for node, command in self.commands_list[t]:
- node.execute_command(command)
- else:
- unexpired_commands[t] = self.commands_list[t]
- self.commands_list = unexpired_commands
-
- def start(self):
- logger.info('Starting storyboard execution')
- self.start_time = time.time()
- script = r'nohup "$@" > /dev/null & echo "$!"'
- for node in self.client_nodes:
- logger.debug("Writing utility startup script on client nodes.")
- node.execute_command(
- "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,)
- )
- try:
- for server in self.servers:
- server.run()
- while time.time() - self.start_time < self.duration:
- self.periodic_check()
- time.sleep(self.DEFAULT_INTERVAL)
- self.periodic_check() # Do things that were scheduled
- # in the last "INTERVAL" seconds
- # of the StoryBoard
- finally:
- for client in self.active_clients:
- client.stop()
- for server in self.servers:
- server.stop()
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
new file mode 100644
index 0000000..8c73422
--- /dev/null
+++ b/rumba/storyboard.py
@@ -0,0 +1,306 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
+
+# Base class for client programs
+#
+# @ap: Application Process binary
+# @options: Options to pass to the binary
+#
+import random
+import time
+
+import rumba.model as model
+import rumba.ssh_support as ssh_support
+import rumba.log as log
+
+logger = log.get_logger(__name__)
+
+try:
+ from numpy.random import poisson
+ from numpy.random import exponential
+ logger.debug("Using numpy for faster and better random variables.")
+except ImportError:
+ from rumba.recpoisson import poisson
+
+ def exponential(mean_duration):
+ return random.expovariate(1.0 / mean_duration)
+
+ logger.debug("Falling back to simple implementations.")
+ # PROBLEM! These logs will almost never be printed... But we might not care
+
+
+class Client(object):
+ def __init__(self, ap, nodes=None, options=None):
+ self.ap = ap
+ self.options = options
+ if isinstance(nodes, model.Node):
+ nodes = [nodes]
+ if nodes is None:
+ nodes = []
+ self.nodes = nodes
+
+ def add_node(self, node):
+ if not isinstance(node, model.Node):
+ raise Exception("A Node is required.")
+ self.nodes.append(node)
+
+ def process(self, duration):
+ node = random.choice(self.nodes) if len(self.nodes) > 0 else None
+ return ClientProcess(self.ap, duration, node, self.options)
+
+
+# Base class for client processes
+#
+# @ap: Application Process binary
+# @duration: The time (in seconds) this process should run
+# @start_time: The time at which this process is started.
+# @options: Options to pass to the binary
+#
+class ClientProcess(Client):
+ def __init__(self, ap, duration, node=None, options=None):
+ super(ClientProcess, self).__init__(ap, node, options=options)
+ self.duration = duration
+ self.start_time = None
+ self.running = False
+ self.node = node
+ self.pid = None
+
+ def run(self, node=None):
+ self.node = node
+ if self.node is None:
+ raise Exception('No node specified for client %s' % (self.ap,))
+ self.start_time = time.time()
+
+ logger.debug(
+ 'Starting client app %s on node %s with duration %s.',
+ self.ap, self.node.name, self.duration
+ )
+
+ opt_str = self.options if self.options is not None else ""
+ cmd = "./startup.sh %s %s" % (self.ap, opt_str)
+ self.running = True
+ try:
+ self.pid = self.node.execute_command(cmd)
+ except ssh_support.SSHException:
+ logger.warning('Could not start client %s on node %s.',
+ self.ap, self.node.name)
+ logger.debug('Client app %s on node %s got pid %s.',
+ self.ap, self.node.name, self.pid)
+
+ def stop(self):
+ logger.debug(
+ 'Killing client %s on node %s.',
+ self.ap, self.node.name
+ )
+ try:
+ self.node.execute_command("kill %s" % self.pid)
+ except ssh_support.SSHException:
+ logger.warn('Could not kill client %s on node %s.',
+ self.ap, self.node.name)
+
+ def check(self):
+ """Check if the process should keep running, stop it if not,
+ and return true if and only if it is still running."""
+ now = time.time()
+ if not self.running:
+ return False
+ if now - self.start_time >= self.duration:
+ self.stop()
+ self.running = False
+ return False
+ return True
+
+
+# Base class for server programs
+#
+# @ap: Application Process binary
+# @arrival_rate: Average requests/s to be received by this server
+# @mean_duration: Average duration of a client connection (in seconds)
+# @options: Options to pass to the binary
+# @max_clients: Maximum number of clients to serve
+# @clients: Client binaries that will use this server
+# @nodes: Specific nodes to start this server on
+#
+class Server:
+ def __init__(self, ap, arrival_rate, mean_duration,
+ options=None, max_clients=float('inf'),
+ clients=None, nodes=None):
+ self.ap = ap
+ self.options = options if options is not None else ""
+ self.max_clients = max_clients
+ if clients is None:
+ clients = list()
+ self.clients = clients
+ self.nodes = nodes
+ self.arrival_rate = arrival_rate # mean requests/s
+ self.mean_duration = mean_duration # in seconds
+ self.pids = {}
+
+ def add_client(self, client):
+ self.clients.append(client)
+
+ def del_client(self, client):
+ self.clients.remove(client)
+
+ def add_node(self, node):
+ self.nodes.append(node)
+
+ def del_node(self, node):
+ self.nodes.remove(node)
+
+ def get_new_clients(self, interval):
+ """
+ Returns a list of clients of size appropriate to the server's rate.
+
+ The list's size should be a sample from Poisson(arrival_rate) over
+ interval seconds.
+ Hence, the average size should be interval * arrival_rate.
+ """
+ number = poisson(self.arrival_rate * interval)
+ number = int(min(number, self.max_clients))
+ return [self.make_client_process() for _ in range(number)]
+
+ def make_client_process(self):
+ """Returns a client of this server"""
+ if len(self.clients) == 0:
+ raise Exception("Server %s has empty client list." % (self,))
+ duration = exponential(self.mean_duration)
+ return random.choice(self.clients).process(duration=duration)
+
+ def run(self):
+ for node in self.nodes:
+ opt_str = self.options
+ logfile = "%s_server.log" % self.ap
+ script = r'nohup "$@" > %s & echo "$!"' % (logfile,)
+ cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh"
+ % (script,),
+ "./startup.sh %s %s" % (self.ap, opt_str)]
+ logger.debug(
+ 'Starting server %s on node %s with logfile %s.',
+ self.ap, node.name, logfile
+ )
+ try:
+ self.pids[node] = (node.execute_commands(cmds))
+ except ssh_support.SSHException:
+ logger.warn('Could not start server %s on node %s.',
+ self.ap, node.name)
+
+ def stop(self):
+ for node, pid in self.pids.items():
+ logger.debug(
+ 'Killing server %s on node %s.',
+ self.ap, node.name
+ )
+ try:
+ node.execute_command("kill %s" % pid)
+ except ssh_support.SSHException:
+ logger.warn('Could not kill server %s on node %s.',
+ self.ap, node.name)
+
+
+# Base class for ARCFIRE storyboards
+#
+# @experiment: Experiment to use as input
+# @duration: Duration of the whole storyboard
+# @servers: App servers available in the network.
+# Type == Server or Type == List[Tuple[Server, Node]]
+#
+class StoryBoard:
+
+ DEFAULT_INTERVAL = 2.5 # in seconds (may be a float)
+
+ def __init__(self, duration, experiment=None, servers=None):
+ self.experiment = experiment
+ self.duration = duration
+ self.servers = list()
+ if servers is None:
+ servers = list()
+ for s in servers:
+ self._validate_and_add_server(s)
+ self.active_clients = []
+ self.start_time = None
+
+ def _validate_and_add_server(self, s):
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
+ if hasattr(s, '__len__') and len(s) == 2:
+ server, node = s
+ if not isinstance(server, Server) or not isinstance(node, model.Node):
+ raise TypeError('First element must be of "Server" type, '
+ 'second must be of "Node" type.')
+ server.add_node(node)
+ self.servers.append(server)
+ elif type(s) == Server:
+ self.servers.append(s)
+ else:
+ raise TypeError('Input servers should be either an object of '
+ '"Server" type or a Server-Node couple.')
+ for node in self.servers[-1].nodes:
+ if node not in self.experiment.nodes:
+ raise ValueError('Cannot run server on node %s, '
+ 'not in experiment.' % (node.name,))
+
+ def set_experiment(self, experiment):
+ if not isinstance(experiment, model.Experiment):
+ raise TypeError('Experiment instance required.')
+ self.experiment = experiment
+
+ def add_server(self, server):
+ self._validate_and_add_server(server)
+
+ def del_server(self, server):
+ self.servers.remove(server)
+
+ def start(self):
+ self.start_time = time.time()
+ script = r'nohup "$@" > /tmp/ & echo "$!"'
+ logger.debug("Writing utility startup script on client nodes.")
+ for server in self.servers:
+ for client in server.clients:
+ for node in client.nodes:
+ node.execute_command(
+ "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,)
+ )
+ try:
+ for server in self.servers:
+ server.run()
+ while time.time() - self.start_time < self.duration:
+ for server in self.servers:
+ clients = server.get_new_clients(self.DEFAULT_INTERVAL)
+ for new_client in clients:
+ new_client.run()
+ self.active_clients.append(new_client)
+ surviving = []
+ for x in self.active_clients:
+ if x.check():
+ surviving.append(x)
+ self.active_clients = surviving
+ time.sleep(self.DEFAULT_INTERVAL)
+ finally:
+ for client in self.active_clients:
+ client.stop()
+ for server in self.servers:
+ server.stop()