aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-06-21 17:08:16 +0200
committerMarco Capitani <m.capitani@nextworks.it>2017-06-21 17:08:16 +0200
commit26ed0d7231ce681e6f2041760ba69406ffb6ee86 (patch)
tree91ff9910ae6755d34ce2cb7067ccb2a5f6dd6125
parent7b599d17b054055d5166a15f71a3e8246af986b7 (diff)
downloadrumba-26ed0d7231ce681e6f2041760ba69406ffb6ee86.tar.gz
rumba-26ed0d7231ce681e6f2041760ba69406ffb6ee86.zip
Storyboard implemented, to be tested
-rw-r--r--rumba/log.py17
-rw-r--r--rumba/model.py142
2 files changed, 131 insertions, 28 deletions
diff --git a/rumba/log.py b/rumba/log.py
index d95c034..987f03a 100644
--- a/rumba/log.py
+++ b/rumba/log.py
@@ -24,6 +24,13 @@ import sys
import multiprocessing
+DEBUG = logging.DEBUG
+INFO = logging.INFO
+WARNING = logging.WARNING
+ERROR = logging.ERROR
+CRITICAL = logging.CRITICAL
+
+
loggers_set = set()
@@ -95,11 +102,11 @@ def set_logging_level(level, name=None):
"""
Set the current logging level to <level> for logger named <name>.
If name is not specified, sets the logging level for all rumba loggers.
- Accepted levels are:
- DEBUG == 10,
- INFO == 20,
- WARNING == 30,
- ERROR == 40,
+ Accepted levels are:
+ DEBUG == 10,
+ INFO == 20,
+ WARNING == 30,
+ ERROR == 40,
CRITICAL == 50,
NOTSET == 0
(resets the logger: its level is set to the default or its parents' level)
diff --git a/rumba/model.py b/rumba/model.py
index 72b7baf..5215065 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -20,13 +20,19 @@
# MA 02110-1301 USA
import abc
-import subprocess
+import random
+import time
import rumba.log as log
-
+from rumba import ssh_support
logger = log.get_logger(__name__)
+try:
+ from numpy.random import poisson
+except ImportError:
+ from rumba.recpoisson import poisson
+
# Represents generic testbed info
#
@@ -567,12 +573,13 @@ class Experiment:
# @options: Options to pass to the binary
#
class Client(object):
- def __init__(self, ap, options=None):
+ def __init__(self, ap, testbed, options=None):
self.ap = ap
self.options = options
+ self.testbed = testbed
- def start_process(self, node, duration, start_time):
- return ClientProcess(self.ap, node, duration, start_time, self.options)
+ def start_process(self, duration):
+ return ClientProcess(self.ap, duration, self.testbed, self.options)
# Base class for client processes
@@ -584,25 +591,54 @@ class Client(object):
# @options: Options to pass to the binary
#
class ClientProcess(Client):
- def __init__(self, ap, node, duration, start_time, options=None):
- super(ClientProcess, self).__init__(ap, options=options)
- self.node = node
+ def __init__(self, ap, duration, testbed, options=None):
+ super(ClientProcess, self).__init__(ap, testbed, options=options)
self.duration = duration
- self.start_time = start_time
- self.run()
- self.running = True
+ self.start_time = None
+ self.running = False
+ self.node = None
+ self.pid = None
- def run(self):
- subprocess.Popen([self.ap] + self.options.split())
+ def run(self, node):
+ self.node = node
+ self.start_time = time.time()
+
+ logger.debug(
+ 'Starting client app %s on node %s with duration %s.',
+ self.ap, self.node.name, self.duration
+ )
+
+ script = r'nohup "$@" > /dev/null & echo "$!"'
+ opt_str = self.options if self.options is not None else ""
+ cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh"
+ % (script,),
+ "./startup.sh %s %s" % (self.ap, opt_str)]
+ self.running = True
+ self.pid = ssh_support.execute_commands(self.testbed,
+ self.node.ssh_config,
+ cmds)
def stop(self):
- pass # TODO to be implemented
-
- def check(self, now):
+ logger.debug(
+ 'Killing client %s on node %s.',
+ self.ap, self.node.name
+ )
+ ssh_support.execute_command(
+ self.testbed,
+ self.node.ssh_config,
+ "kill %s" % self.pid
+ )
+
+ def check(self):
+ """Check if the process should keep running, stop it if not,
+ and return true if and only if it is still running."""
+ now = time.time()
if not self.running:
- return
+ return False
if now - self.start_time >= self.duration:
self.stop()
+ return False
+ return True
# Base class for server programs
@@ -610,14 +646,15 @@ class ClientProcess(Client):
# @ap: Application Process binary
# @arrival_rate: Average requests/s to be received by this server
# @mean_duration: Average duration of a client connection (in seconds)
+# @testbed: the testbed for the experiment
# @options: Options to pass to the binary
# @max_clients: Maximum number of clients to serve
# @clients: Client binaries that will use this server
# @nodes: Specific nodes to start this server on
#
class Server:
- def __init__(self, ap, arrival_rate, mean_duration,
- options=None, max_clients=None,
+ def __init__(self, ap, arrival_rate, mean_duration, testbed,
+ options=None, max_clients=float('inf'),
clients=None, nodes=None):
self.ap = ap
self.options = options
@@ -628,6 +665,8 @@ class Server:
self.nodes = nodes
self.arrival_rate = arrival_rate # mean requests/s
self.mean_duration = mean_duration # in seconds
+ self.testbed = testbed
+ self.pids = {}
def add_client(self, client):
self.clients.append(client)
@@ -649,13 +688,46 @@ class Server:
interval seconds.
Hence, the average size should be interval * arrival_rate.
"""
- pass
+ # WARNING! using numpy. To be discussed.
+ number = poisson(self.arrival_rate * interval)
+ number = int(min(number, self.max_clients))
+ l = [self.make_client_process() for _ in range(number)]
+ return l
def make_client_process(self):
"""Returns a client of this server"""
if len(self.clients) == 0:
- raise Exception("Server %s has empty client list," % (self,))
- pass # TODO should return a ClientProcess
+ raise Exception("Server %s has empty client list." % (self,))
+ duration = random.expovariate(1.0 / self.mean_duration)
+ return random.choice(self.clients)\
+ .start_process(duration=duration)
+
+ def run(self):
+ for node in self.nodes:
+ opt_str = self.options if self.options is not None else ""
+ script = r'nohup "$@" > /dev/null & echo "$!"'
+ cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh"
+ % (script,),
+ "./startup.sh %s %s" % (self.ap, opt_str)]
+ logger.debug(
+ 'Starting server %s on node %s.',
+ self.ap, node.name
+ )
+ self.pids[node] = (ssh_support.execute_commands(self.testbed,
+ node.ssh_config,
+ cmds))
+
+ def stop(self):
+ for node, pid in self.pids.items():
+ logger.debug(
+ 'Killing server %s on node %s.',
+ self.ap, node.name
+ )
+ ssh_support.execute_command(
+ self.testbed,
+ node.ssh_config,
+ "kill %s" % pid
+ )
# Base class for ARCFIRE storyboards
@@ -665,12 +737,18 @@ class Server:
# @servers: App servers available in the network
#
class StoryBoard:
+
+ DEFAULT_INTERVAL = 2.5 # in seconds (may be a float)
+
def __init__(self, experiment, duration, servers=None):
self.experiment = experiment
self.duration = duration
if servers is None:
servers = list()
self.servers = servers
+ self.client_nodes = [c for c in experiment.nodes if c.client]
+ self.active_clients = []
+ self.start_time = None
def add_server(self, server):
self.servers.append(server)
@@ -679,4 +757,22 @@ class StoryBoard:
self.servers.remove(server)
def start(self):
- pass
+ self.start_time = time.time()
+ try:
+ for server in self.servers:
+ server.run()
+ while time.time() - self.start_time < self.duration:
+ for server in self.servers:
+ clients = server.get_new_clients(self.DEFAULT_INTERVAL)
+ for new_client in clients:
+ client_node = random.choice(self.client_nodes)
+ new_client.run(client_node)
+ self.active_clients.append(new_client)
+ self.active_clients = \
+ [x for x in self.active_clients if x.check()]
+ time.sleep(self.DEFAULT_INTERVAL)
+ finally:
+ for client in self.active_clients:
+ client.stop()
+ for server in self.servers:
+ server.stop()