aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSander Vrijders <sander.vrijders@intec.ugent.be>2017-07-28 10:07:31 +0000
committerSander Vrijders <sander.vrijders@intec.ugent.be>2017-07-28 10:07:31 +0000
commitf3f51c550ef9751bada8ffcd4d4e846c786a26e0 (patch)
treeb6cddc451fc182e0bd3384805f4ae705375f043b
parent7b43f02b415968371f1b5719232bd7741cb7f12f (diff)
parent7dc3b5086d4c1d19c9ef315369f3fd7c4dd2889e (diff)
downloadrumba-f3f51c550ef9751bada8ffcd4d4e846c786a26e0.tar.gz
rumba-f3f51c550ef9751bada8ffcd4d4e846c786a26e0.zip
Merge branch 'storyboard-impl' into 'master'
Storyboard impl See merge request !61
-rwxr-xr-xexamples/example.py2
-rw-r--r--rumba/log.py7
-rw-r--r--rumba/model.py218
-rw-r--r--rumba/prototypes/irati.py2
-rw-r--r--rumba/recpoisson.py65
-rw-r--r--rumba/ssh_support.py26
-rw-r--r--rumba/testbeds/emulab.py4
-rw-r--r--rumba/testbeds/jfed.py14
-rw-r--r--rumba/testbeds/qemu.py2
-rwxr-xr-xsetup.py12
10 files changed, 298 insertions, 54 deletions
diff --git a/examples/example.py b/examples/example.py
index 567852f..11e8331 100755
--- a/examples/example.py
+++ b/examples/example.py
@@ -49,7 +49,7 @@ try:
exp.bootstrap_prototype()
c1 = Client("rinaperf", options ="-t perf -s 1000 -c 10000")
s1 = Server("rinaperf", arrival_rate=2, mean_duration=5, options = "-l", nodes = [a], clients = [c1])
- sb = StoryBoard(exp, 3600, servers = [s1])
+ sb = StoryBoard(exp, duration=3600, servers = [s1])
sb.start()
finally:
exp.swap_out()
diff --git a/rumba/log.py b/rumba/log.py
index bed0170..c509532 100644
--- a/rumba/log.py
+++ b/rumba/log.py
@@ -24,6 +24,13 @@ import sys
import multiprocessing
+DEBUG = logging.DEBUG
+INFO = logging.INFO
+WARNING = logging.WARNING
+ERROR = logging.ERROR
+CRITICAL = logging.CRITICAL
+
+
loggers_set = set()
diff --git a/rumba/model.py b/rumba/model.py
index f4f98d1..1adc3b0 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -20,13 +20,29 @@
# MA 02110-1301 USA
import abc
-
-import rumba.log as log
import os
+import random
import stat
+import time
+
+import rumba.log as log
+from rumba import ssh_support
logger = log.get_logger(__name__)
+try:
+ from numpy.random import poisson
+ from numpy.random import exponential
+ logger.debug("Using numpy for faster and better random variables.")
+except ImportError:
+ from rumba.recpoisson import poisson
+
+ def exponential(mean_duration):
+ return random.expovariate(1.0 / mean_duration)
+
+ logger.debug("Falling back to simple implementations.")
+ # PROBLEM! These logs will almost never be printed... But we might not care
+
tmp_dir = '/tmp/rumba'
try:
os.mkdir(tmp_dir)
@@ -42,6 +58,7 @@ except OSError:
# Already there, nothing to do
pass
+
# Represents generic testbed info
#
# @username [string] user name
@@ -164,10 +181,18 @@ class NormalDIF(DIF):
# SSH Configuration
#
class SSHConfig:
- def __init__(self, hostname, port=22, proxycommand=None):
+ def __init__(self, hostname, port=22, proxy_command=None):
+ self.username = None
+ self.password = None
self.hostname = hostname
self.port = port
- self.proxycommand = proxycommand
+ self.proxy_command = proxy_command
+
+ def set_username(self, username):
+ self.username = username
+
+ def set_password(self, password):
+ self.password = password
# A node in the experiment
@@ -282,6 +307,74 @@ class Node:
def get_policy(self, dif):
return self.policies[dif]
+ def execute_commands(self, commands, time_out=3, use_proxy=False):
+ # Ssh_config is used twice since it doubles as testbed info
+ # (it holds fields username and password)
+ if use_proxy:
+ return ssh_support.execute_proxy_commands(
+ self.ssh_config,
+ self.ssh_config,
+ commands,
+ time_out
+ )
+ # else:
+ return ssh_support.execute_commands(
+ self.ssh_config,
+ self.ssh_config,
+ commands,
+ time_out
+ )
+
+ def execute_command(self, command, time_out=3, use_proxy=False):
+ # Ssh_config is used twice since it doubles as testbed info
+ # (it holds fields username and password)
+ if use_proxy:
+ return ssh_support.execute_proxy_command(
+ self.ssh_config,
+ self.ssh_config,
+ command,
+ time_out
+ )
+ # else:
+ return ssh_support.execute_command(
+ self.ssh_config,
+ self.ssh_config,
+ command,
+ time_out
+ )
+
+ def write_text_to_file(self, text, file_name):
+ ssh_support.write_text_to_file(
+ self.ssh_config,
+ self.ssh_config,
+ text,
+ file_name
+ )
+
+ def copy_file(self, path, destination):
+ ssh_support.copy_file_to_testbed(
+ self.ssh_config,
+ self.ssh_config,
+ path,
+ destination
+ )
+
+ def copy_files(self, paths, destination):
+ ssh_support.copy_files_to_testbed(
+ self.ssh_config,
+ self.ssh_config,
+ paths,
+ destination
+ )
+
+ def setup_vlan(self, vlan_id, int_name):
+ ssh_support.setup_vlan(
+ self.ssh_config,
+ self.ssh_config,
+ vlan_id,
+ int_name
+ )
+
# Base class representing an IPC Process to be created in the experiment
#
@@ -647,7 +740,7 @@ class Experiment:
self.testbed.username,
node.ssh_config.hostname,
node.ssh_config.port,
- node.ssh_config.proxycommand))
+ node.ssh_config.proxy_command))
f.close()
# Examine the nodes and DIFs, compute the registration and enrollment
@@ -692,38 +785,57 @@ class Client(object):
self.ap = ap
self.options = options
- def start_process(self, node, duration, start_time):
- return ClientProcess(self.ap, node, duration, start_time, self.options)
+ def start_process(self, duration):
+ return ClientProcess(self.ap, duration, self.options)
# Base class for client processes
#
# @ap: Application Process binary
-# @node: The node on which this process should run
# @duration: The time (in seconds) this process should run
# @start_time: The time at which this process is started.
# @options: Options to pass to the binary
#
class ClientProcess(Client):
- def __init__(self, ap, node, duration, start_time, options=None):
+ def __init__(self, ap, duration, options=None):
super(ClientProcess, self).__init__(ap, options=options)
- self.node = node
self.duration = duration
- self.start_time = start_time
- self.run()
- self.running = True
+ self.start_time = None
+ self.running = False
+ self.node = None
+ self.pid = None
- def run(self):
- pass # TODO to be implemented
+ def run(self, node):
+ self.node = node
+ self.start_time = time.time()
- def stop(self):
- pass # TODO to be implemented
+ logger.debug(
+ 'Starting client app %s on node %s with duration %s.',
+ self.ap, self.node.name, self.duration
+ )
+
+ opt_str = self.options if self.options is not None else ""
+ cmd = "./startup.sh %s %s" % (self.ap, opt_str)
+ self.running = True
+ self.pid = self.node.execute_command(cmd)
- def check(self, now):
+ def stop(self):
+ logger.debug(
+ 'Killing client %s on node %s.',
+ self.ap, self.node.name
+ )
+ self.node.execute_command("kill %s" % self.pid)
+
+ def check(self):
+ """Check if the process should keep running, stop it if not,
+ and return true if and only if it is still running."""
+ now = time.time()
if not self.running:
- return
+ return False
if now - self.start_time >= self.duration:
self.stop()
+ return False
+ return True
# Base class for server programs
@@ -738,7 +850,7 @@ class ClientProcess(Client):
#
class Server:
def __init__(self, ap, arrival_rate, mean_duration,
- options=None, max_clients=None,
+ options=None, max_clients=float('inf'),
clients=None, nodes=None):
self.ap = ap
self.options = options
@@ -749,6 +861,7 @@ class Server:
self.nodes = nodes
self.arrival_rate = arrival_rate # mean requests/s
self.mean_duration = mean_duration # in seconds
+ self.pids = {}
def add_client(self, client):
self.clients.append(client)
@@ -770,13 +883,40 @@ class Server:
interval seconds.
Hence, the average size should be interval * arrival_rate.
"""
- pass
+ number = poisson(self.arrival_rate * interval)
+ number = int(min(number, self.max_clients))
+ l = [self.make_client_process() for _ in range(number)]
+ return l
def make_client_process(self):
"""Returns a client of this server"""
if len(self.clients) == 0:
- raise Exception("Server %s has empty client list," % (self,))
- pass # TODO should return a ClientProcess
+ raise Exception("Server %s has empty client list." % (self,))
+ duration = exponential(self.mean_duration)
+ return random.choice(self.clients)\
+ .start_process(duration=duration)
+
+ def run(self):
+ for node in self.nodes:
+ opt_str = self.options if self.options is not None else ""
+ logfile = "%s.log" % self.ap
+ script = r'nohup "$@" > %s & echo "$!"' % logfile
+ cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh"
+ % (script,),
+ "./startup.sh %s %s" % (self.ap, opt_str)]
+ logger.debug(
+ 'Starting server %s on node %s with logfile %s.',
+ self.ap, node.name, logfile
+ )
+ self.pids[node] = (node.execute_commands(cmds))
+
+ def stop(self):
+ for node, pid in self.pids.items():
+ logger.debug(
+ 'Killing server %s on node %s.',
+ self.ap, node.name
+ )
+ node.execute_command("kill %s" % pid)
# Base class for ARCFIRE storyboards
@@ -786,12 +926,18 @@ class Server:
# @servers: App servers available in the network
#
class StoryBoard:
+
+ DEFAULT_INTERVAL = 2.5 # in seconds (may be a float)
+
def __init__(self, experiment, duration, servers=None):
self.experiment = experiment
self.duration = duration
if servers is None:
servers = list()
self.servers = servers
+ self.client_nodes = [c for c in experiment.nodes if c.client]
+ self.active_clients = []
+ self.start_time = None
def add_server(self, server):
self.servers.append(server)
@@ -800,4 +946,28 @@ class StoryBoard:
self.servers.remove(server)
def start(self):
- pass
+ self.start_time = time.time()
+ script = r'nohup "$@" > /dev/null & echo "$!"'
+ for node in self.client_nodes:
+ logger.debug("Writing utility startup script on client nodes.")
+ node.execute_command(
+ "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,)
+ )
+ try:
+ for server in self.servers:
+ server.run()
+ while time.time() - self.start_time < self.duration:
+ for server in self.servers:
+ clients = server.get_new_clients(self.DEFAULT_INTERVAL)
+ for new_client in clients:
+ client_node = random.choice(self.client_nodes)
+ new_client.run(client_node)
+ self.active_clients.append(new_client)
+ self.active_clients = \
+ [x for x in self.active_clients if x.check()]
+ time.sleep(self.DEFAULT_INTERVAL)
+ finally:
+ for client in self.active_clients:
+ client.stop()
+ for server in self.servers:
+ server.stop()
diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py
index 57901fc..14d4c27 100644
--- a/rumba/prototypes/irati.py
+++ b/rumba/prototypes/irati.py
@@ -146,7 +146,7 @@ class Experiment(mod.Experiment):
'ipcmcomps': ipcm_components}
logger.info('Copying configuration files to node %s', node.name)
- ssh.copy_paths_to_testbed(self.testbed,
+ ssh.copy_files_to_testbed(self.testbed,
node.ssh_config,
gen_files,
'')
diff --git a/rumba/recpoisson.py b/rumba/recpoisson.py
new file mode 100644
index 0000000..3c1e6fe
--- /dev/null
+++ b/rumba/recpoisson.py
@@ -0,0 +1,65 @@
+import math
+import random
+
+import sys
+
+if sys.version_info < (3, 2):
+ from repoze.lru import lru_cache
+ # from functools32 import lru_cache
+else:
+ from functools import lru_cache
+
+
+@lru_cache(1000)
+def _get_poisson_var(parameter):
+ return Poisson(parameter)
+
+
+class Poisson(object):
+
+ def __init__(self, parameter):
+ self.parameter = parameter
+
+ def c_p(k):
+ """Compute the Poisson CDF for k iteratively."""
+ if k == 0:
+ return self._p(0)
+ else:
+ return self._compute_poisson_cdf(k - 1) + self._p(k)
+ self._compute_poisson_cdf = lru_cache(int(2.5*self.parameter) + 1)(c_p)
+
+ @staticmethod
+ def _get_random():
+ return random.random()
+
+ def _p(self, k):
+ # l^k * e^-l / k!
+ # Computed as exp(klog(l) - l - log(k!))
+ l = self.parameter
+ l_to_the_k = k * math.log(l)
+ k_fact = sum([math.log(i + 1) for i in range(k)])
+ return math.exp(l_to_the_k - l - k_fact)
+
+ def sample(self):
+ # The idea is:
+ # take a sample from U(0,1) and call it f.
+ # Let x be s.t. x = min_N F(x) > f
+ # where F is the cumulative distribution function of Poisson(parameter)
+ # return x
+ f = self._get_random()
+
+ # We compute x iteratively by computing
+ # \sum_k(P=k)
+ # where P ~ Poisson(parameter) and stopping as soon as
+ # it is greater than f.
+ # We use the cache to store results.
+ current_cdf = -1
+ current_x = -1
+ while current_cdf < f:
+ current_x += 1
+ current_cdf = self._compute_poisson_cdf(current_x)
+ return current_x
+
+
+def poisson(parameter):
+ return _get_poisson_var(parameter).sample()
diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py
index b0970e1..53d81a1 100644
--- a/rumba/ssh_support.py
+++ b/rumba/ssh_support.py
@@ -102,8 +102,8 @@ def execute_commands(testbed, ssh_config, commands, time_out=3):
"""
ssh_client = get_ssh_client()
- if ssh_config.proxycommand is not None:
- proxy = paramiko.ProxyCommand(ssh_config.proxycommand)
+ if ssh_config.proxy_command is not None:
+ proxy = paramiko.ProxyCommand(ssh_config.proxy_command)
else:
proxy = None
@@ -150,7 +150,7 @@ def execute_command(testbed, ssh_config, command, time_out=3):
return o
-def copy_file_to_testbed(testbed, ssh_config, text, file_name):
+def write_text_to_file(testbed, ssh_config, text, file_name):
"""
Write a string to a given remote file.
Overwrite the complete file if it already exists!
@@ -162,8 +162,8 @@ def copy_file_to_testbed(testbed, ssh_config, text, file_name):
"""
ssh_client = get_ssh_client()
- if ssh_config.proxycommand is not None:
- proxy = paramiko.ProxyCommand(ssh_config.proxycommand)
+ if ssh_config.proxy_command is not None:
+ proxy = paramiko.ProxyCommand(ssh_config.proxy_command)
else:
proxy = None
@@ -193,10 +193,9 @@ def copy_file_to_testbed(testbed, ssh_config, text, file_name):
logger.error(str(e))
-def copy_paths_to_testbed(testbed, ssh_config, paths, destination):
+def copy_files_to_testbed(testbed, ssh_config, paths, destination):
"""
- Write a string to a given remote file.
- Overwrite the complete file if it already exists!
+ Copies local files to a remote node.
@param testbed: testbed info
@param ssh_config: ssh config of the node
@@ -205,8 +204,8 @@ def copy_paths_to_testbed(testbed, ssh_config, paths, destination):
"""
ssh_client = get_ssh_client()
- if ssh_config.proxycommand is not None:
- proxy = paramiko.ProxyCommand(ssh_config.proxycommand)
+ if ssh_config.proxy_command is not None:
+ proxy = paramiko.ProxyCommand(ssh_config.proxy_command)
else:
proxy = None
@@ -237,17 +236,16 @@ def copy_paths_to_testbed(testbed, ssh_config, paths, destination):
logger.error(str(e))
-def copy_path_to_testbed(testbed, ssh_config, path, destination):
+def copy_file_to_testbed(testbed, ssh_config, path, destination):
"""
- Write a string to a given remote file.
- Overwrite the complete file if it already exists!
+ Copies a local file to a remote node.
@param testbed: testbed info
@param ssh_config: ssh config of the node
@param path: source path (local)
@param destination: destination folder name (remote)
"""
- copy_paths_to_testbed(testbed, ssh_config, [path], destination)
+ copy_files_to_testbed(testbed, ssh_config, [path], destination)
def setup_vlan(testbed, node, vlan_id, int_name):
diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py
index e7458bc..53eebe5 100644
--- a/rumba/testbeds/emulab.py
+++ b/rumba/testbeds/emulab.py
@@ -131,7 +131,7 @@ class Testbed(mod.Testbed):
ns = self.generate_ns_script(experiment)
dest_file_name = '/users/' + self.username + \
'/temp_ns_file.%s.ns' % os.getpid()
- ssh.copy_file_to_testbed(self, self.ops_ssh_config, ns, dest_file_name)
+ ssh.write_text_to_file(self, self.ops_ssh_config, ns, dest_file_name)
cmd = '/usr/testbed/bin/sslxmlrpc_client.py startexp ' + \
'batch=false wait=true proj="' + proj_name + \
@@ -210,6 +210,8 @@ class Testbed(mod.Testbed):
for node in experiment.nodes:
node.ssh_config.hostname = self.full_name(node.name)
+ node.ssh_config.set_username(self.username)
+ node.ssh_config.set_password(self.password)
cmd = 'cat /var/emulab/boot/topomap'
topomap = ssh.execute_command(self, experiment.nodes[0].ssh_config, cmd)
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py
index 9c72ca7..8867dc6 100644
--- a/rumba/testbeds/jfed.py
+++ b/rumba/testbeds/jfed.py
@@ -155,11 +155,13 @@ class Testbed(mod.Testbed):
node.ssh_config.hostname = \
node.name + "." + self.exp_name + "." + \
auth_name_r + "." + self.auth_name
- node.ssh_config.proxycommand = "ssh -i '" + self.cert_file + \
- "' -o StrictHostKeyChecking=no " + \
- self.username + \
- "@bastion.test.iminds.be nc " + \
- node.ssh_config.hostname + " 22"
+ node.ssh_config.proxy_command = "ssh -i '" + self.cert_file + \
+ "' -o StrictHostKeyChecking=no " + \
+ self.username + \
+ "@bastion.test.iminds.be nc " + \
+ node.ssh_config.hostname + " 22"
+ node.ssh_config.username = self.username
+ node.ssh_config.password = self.password
subprocess.call(["java", "-jar", self.jfed_jar, "create", "-S",
self.proj_name, "--rspec",
@@ -174,8 +176,6 @@ class Testbed(mod.Testbed):
rspec = xml.parse(self.manifest)
xml_nodes = rspec.getElementsByTagName("node")
- dir_path = os.path.dirname(os.path.abspath(__file__))
-
# Complete details of the nodes after swapin
logger.info("Sleeping for two seconds to avoid contacting jfed nodes "
"too soon.")
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index f0b73a8..3d30ce2 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -254,6 +254,8 @@ class Testbed(mod.Testbed):
vm['id'] = vmid
node.ssh_config.hostname = "localhost"
node.ssh_config.port = fwdp
+ node.ssh_config.username = self.username
+ node.ssh_config.password = self.password
log_file = os.path.join(mod.tmp_dir, name + '.log')
vars_dict = {'fwdp': fwdp, 'id': vmid, 'mac': mac,
diff --git a/setup.py b/setup.py
index f00e934..2648f89 100755
--- a/setup.py
+++ b/setup.py
@@ -1,10 +1,9 @@
#!/usr/bin/env python
-from setuptools import setup
-from codecs import open
-from os import path
+import setuptools
-setup(
+
+setuptools.setup(
name="Rumba",
version="0.4",
url="https://gitlab.com/arcfire/rumba",
@@ -14,6 +13,7 @@ setup(
license="LGPL",
description="Rumba measurement framework for RINA",
packages=["rumba", "rumba.testbeds", "rumba.prototypes"],
- install_requires=["paramiko", "wheel", "wget"],
- scripts = ['tools/rumba-access']
+ install_requires=["paramiko", "wget", 'repoze.lru; python_version<"3.2"'],
+ extras_require={"NumpyAcceleration": ["numpy"]},
+ scripts=['tools/rumba-access']
)