diff options
author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2017-07-28 10:07:31 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2017-07-28 10:07:31 +0000 |
commit | f3f51c550ef9751bada8ffcd4d4e846c786a26e0 (patch) | |
tree | b6cddc451fc182e0bd3384805f4ae705375f043b | |
parent | 7b43f02b415968371f1b5719232bd7741cb7f12f (diff) | |
parent | 7dc3b5086d4c1d19c9ef315369f3fd7c4dd2889e (diff) | |
download | rumba-f3f51c550ef9751bada8ffcd4d4e846c786a26e0.tar.gz rumba-f3f51c550ef9751bada8ffcd4d4e846c786a26e0.zip |
Merge branch 'storyboard-impl' into 'master'
Storyboard impl
See merge request !61
-rwxr-xr-x | examples/example.py | 2 | ||||
-rw-r--r-- | rumba/log.py | 7 | ||||
-rw-r--r-- | rumba/model.py | 218 | ||||
-rw-r--r-- | rumba/prototypes/irati.py | 2 | ||||
-rw-r--r-- | rumba/recpoisson.py | 65 | ||||
-rw-r--r-- | rumba/ssh_support.py | 26 | ||||
-rw-r--r-- | rumba/testbeds/emulab.py | 4 | ||||
-rw-r--r-- | rumba/testbeds/jfed.py | 14 | ||||
-rw-r--r-- | rumba/testbeds/qemu.py | 2 | ||||
-rwxr-xr-x | setup.py | 12 |
10 files changed, 298 insertions, 54 deletions
diff --git a/examples/example.py b/examples/example.py index 567852f..11e8331 100755 --- a/examples/example.py +++ b/examples/example.py @@ -49,7 +49,7 @@ try: exp.bootstrap_prototype() c1 = Client("rinaperf", options ="-t perf -s 1000 -c 10000") s1 = Server("rinaperf", arrival_rate=2, mean_duration=5, options = "-l", nodes = [a], clients = [c1]) - sb = StoryBoard(exp, 3600, servers = [s1]) + sb = StoryBoard(exp, duration=3600, servers = [s1]) sb.start() finally: exp.swap_out() diff --git a/rumba/log.py b/rumba/log.py index bed0170..c509532 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -24,6 +24,13 @@ import sys import multiprocessing +DEBUG = logging.DEBUG +INFO = logging.INFO +WARNING = logging.WARNING +ERROR = logging.ERROR +CRITICAL = logging.CRITICAL + + loggers_set = set() diff --git a/rumba/model.py b/rumba/model.py index f4f98d1..1adc3b0 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -20,13 +20,29 @@ # MA 02110-1301 USA import abc - -import rumba.log as log import os +import random import stat +import time + +import rumba.log as log +from rumba import ssh_support logger = log.get_logger(__name__) +try: + from numpy.random import poisson + from numpy.random import exponential + logger.debug("Using numpy for faster and better random variables.") +except ImportError: + from rumba.recpoisson import poisson + + def exponential(mean_duration): + return random.expovariate(1.0 / mean_duration) + + logger.debug("Falling back to simple implementations.") + # PROBLEM! These logs will almost never be printed... But we might not care + tmp_dir = '/tmp/rumba' try: os.mkdir(tmp_dir) @@ -42,6 +58,7 @@ except OSError: # Already there, nothing to do pass + # Represents generic testbed info # # @username [string] user name @@ -164,10 +181,18 @@ class NormalDIF(DIF): # SSH Configuration # class SSHConfig: - def __init__(self, hostname, port=22, proxycommand=None): + def __init__(self, hostname, port=22, proxy_command=None): + self.username = None + self.password = None self.hostname = hostname self.port = port - self.proxycommand = proxycommand + self.proxy_command = proxy_command + + def set_username(self, username): + self.username = username + + def set_password(self, password): + self.password = password # A node in the experiment @@ -282,6 +307,74 @@ class Node: def get_policy(self, dif): return self.policies[dif] + def execute_commands(self, commands, time_out=3, use_proxy=False): + # Ssh_config is used twice since it doubles as testbed info + # (it holds fields username and password) + if use_proxy: + return ssh_support.execute_proxy_commands( + self.ssh_config, + self.ssh_config, + commands, + time_out + ) + # else: + return ssh_support.execute_commands( + self.ssh_config, + self.ssh_config, + commands, + time_out + ) + + def execute_command(self, command, time_out=3, use_proxy=False): + # Ssh_config is used twice since it doubles as testbed info + # (it holds fields username and password) + if use_proxy: + return ssh_support.execute_proxy_command( + self.ssh_config, + self.ssh_config, + command, + time_out + ) + # else: + return ssh_support.execute_command( + self.ssh_config, + self.ssh_config, + command, + time_out + ) + + def write_text_to_file(self, text, file_name): + ssh_support.write_text_to_file( + self.ssh_config, + self.ssh_config, + text, + file_name + ) + + def copy_file(self, path, destination): + ssh_support.copy_file_to_testbed( + self.ssh_config, + self.ssh_config, + path, + destination + ) + + def copy_files(self, paths, destination): + ssh_support.copy_files_to_testbed( + self.ssh_config, + self.ssh_config, + paths, + destination + ) + + def setup_vlan(self, vlan_id, int_name): + ssh_support.setup_vlan( + self.ssh_config, + self.ssh_config, + vlan_id, + int_name + ) + # Base class representing an IPC Process to be created in the experiment # @@ -647,7 +740,7 @@ class Experiment: self.testbed.username, node.ssh_config.hostname, node.ssh_config.port, - node.ssh_config.proxycommand)) + node.ssh_config.proxy_command)) f.close() # Examine the nodes and DIFs, compute the registration and enrollment @@ -692,38 +785,57 @@ class Client(object): self.ap = ap self.options = options - def start_process(self, node, duration, start_time): - return ClientProcess(self.ap, node, duration, start_time, self.options) + def start_process(self, duration): + return ClientProcess(self.ap, duration, self.options) # Base class for client processes # # @ap: Application Process binary -# @node: The node on which this process should run # @duration: The time (in seconds) this process should run # @start_time: The time at which this process is started. # @options: Options to pass to the binary # class ClientProcess(Client): - def __init__(self, ap, node, duration, start_time, options=None): + def __init__(self, ap, duration, options=None): super(ClientProcess, self).__init__(ap, options=options) - self.node = node self.duration = duration - self.start_time = start_time - self.run() - self.running = True + self.start_time = None + self.running = False + self.node = None + self.pid = None - def run(self): - pass # TODO to be implemented + def run(self, node): + self.node = node + self.start_time = time.time() - def stop(self): - pass # TODO to be implemented + logger.debug( + 'Starting client app %s on node %s with duration %s.', + self.ap, self.node.name, self.duration + ) + + opt_str = self.options if self.options is not None else "" + cmd = "./startup.sh %s %s" % (self.ap, opt_str) + self.running = True + self.pid = self.node.execute_command(cmd) - def check(self, now): + def stop(self): + logger.debug( + 'Killing client %s on node %s.', + self.ap, self.node.name + ) + self.node.execute_command("kill %s" % self.pid) + + def check(self): + """Check if the process should keep running, stop it if not, + and return true if and only if it is still running.""" + now = time.time() if not self.running: - return + return False if now - self.start_time >= self.duration: self.stop() + return False + return True # Base class for server programs @@ -738,7 +850,7 @@ class ClientProcess(Client): # class Server: def __init__(self, ap, arrival_rate, mean_duration, - options=None, max_clients=None, + options=None, max_clients=float('inf'), clients=None, nodes=None): self.ap = ap self.options = options @@ -749,6 +861,7 @@ class Server: self.nodes = nodes self.arrival_rate = arrival_rate # mean requests/s self.mean_duration = mean_duration # in seconds + self.pids = {} def add_client(self, client): self.clients.append(client) @@ -770,13 +883,40 @@ class Server: interval seconds. Hence, the average size should be interval * arrival_rate. """ - pass + number = poisson(self.arrival_rate * interval) + number = int(min(number, self.max_clients)) + l = [self.make_client_process() for _ in range(number)] + return l def make_client_process(self): """Returns a client of this server""" if len(self.clients) == 0: - raise Exception("Server %s has empty client list," % (self,)) - pass # TODO should return a ClientProcess + raise Exception("Server %s has empty client list." % (self,)) + duration = exponential(self.mean_duration) + return random.choice(self.clients)\ + .start_process(duration=duration) + + def run(self): + for node in self.nodes: + opt_str = self.options if self.options is not None else "" + logfile = "%s.log" % self.ap + script = r'nohup "$@" > %s & echo "$!"' % logfile + cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" + % (script,), + "./startup.sh %s %s" % (self.ap, opt_str)] + logger.debug( + 'Starting server %s on node %s with logfile %s.', + self.ap, node.name, logfile + ) + self.pids[node] = (node.execute_commands(cmds)) + + def stop(self): + for node, pid in self.pids.items(): + logger.debug( + 'Killing server %s on node %s.', + self.ap, node.name + ) + node.execute_command("kill %s" % pid) # Base class for ARCFIRE storyboards @@ -786,12 +926,18 @@ class Server: # @servers: App servers available in the network # class StoryBoard: + + DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) + def __init__(self, experiment, duration, servers=None): self.experiment = experiment self.duration = duration if servers is None: servers = list() self.servers = servers + self.client_nodes = [c for c in experiment.nodes if c.client] + self.active_clients = [] + self.start_time = None def add_server(self, server): self.servers.append(server) @@ -800,4 +946,28 @@ class StoryBoard: self.servers.remove(server) def start(self): - pass + self.start_time = time.time() + script = r'nohup "$@" > /dev/null & echo "$!"' + for node in self.client_nodes: + logger.debug("Writing utility startup script on client nodes.") + node.execute_command( + "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) + ) + try: + for server in self.servers: + server.run() + while time.time() - self.start_time < self.duration: + for server in self.servers: + clients = server.get_new_clients(self.DEFAULT_INTERVAL) + for new_client in clients: + client_node = random.choice(self.client_nodes) + new_client.run(client_node) + self.active_clients.append(new_client) + self.active_clients = \ + [x for x in self.active_clients if x.check()] + time.sleep(self.DEFAULT_INTERVAL) + finally: + for client in self.active_clients: + client.stop() + for server in self.servers: + server.stop() diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py index 57901fc..14d4c27 100644 --- a/rumba/prototypes/irati.py +++ b/rumba/prototypes/irati.py @@ -146,7 +146,7 @@ class Experiment(mod.Experiment): 'ipcmcomps': ipcm_components} logger.info('Copying configuration files to node %s', node.name) - ssh.copy_paths_to_testbed(self.testbed, + ssh.copy_files_to_testbed(self.testbed, node.ssh_config, gen_files, '') diff --git a/rumba/recpoisson.py b/rumba/recpoisson.py new file mode 100644 index 0000000..3c1e6fe --- /dev/null +++ b/rumba/recpoisson.py @@ -0,0 +1,65 @@ +import math +import random + +import sys + +if sys.version_info < (3, 2): + from repoze.lru import lru_cache + # from functools32 import lru_cache +else: + from functools import lru_cache + + +@lru_cache(1000) +def _get_poisson_var(parameter): + return Poisson(parameter) + + +class Poisson(object): + + def __init__(self, parameter): + self.parameter = parameter + + def c_p(k): + """Compute the Poisson CDF for k iteratively.""" + if k == 0: + return self._p(0) + else: + return self._compute_poisson_cdf(k - 1) + self._p(k) + self._compute_poisson_cdf = lru_cache(int(2.5*self.parameter) + 1)(c_p) + + @staticmethod + def _get_random(): + return random.random() + + def _p(self, k): + # l^k * e^-l / k! + # Computed as exp(klog(l) - l - log(k!)) + l = self.parameter + l_to_the_k = k * math.log(l) + k_fact = sum([math.log(i + 1) for i in range(k)]) + return math.exp(l_to_the_k - l - k_fact) + + def sample(self): + # The idea is: + # take a sample from U(0,1) and call it f. + # Let x be s.t. x = min_N F(x) > f + # where F is the cumulative distribution function of Poisson(parameter) + # return x + f = self._get_random() + + # We compute x iteratively by computing + # \sum_k(P=k) + # where P ~ Poisson(parameter) and stopping as soon as + # it is greater than f. + # We use the cache to store results. + current_cdf = -1 + current_x = -1 + while current_cdf < f: + current_x += 1 + current_cdf = self._compute_poisson_cdf(current_x) + return current_x + + +def poisson(parameter): + return _get_poisson_var(parameter).sample() diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py index b0970e1..53d81a1 100644 --- a/rumba/ssh_support.py +++ b/rumba/ssh_support.py @@ -102,8 +102,8 @@ def execute_commands(testbed, ssh_config, commands, time_out=3): """ ssh_client = get_ssh_client() - if ssh_config.proxycommand is not None: - proxy = paramiko.ProxyCommand(ssh_config.proxycommand) + if ssh_config.proxy_command is not None: + proxy = paramiko.ProxyCommand(ssh_config.proxy_command) else: proxy = None @@ -150,7 +150,7 @@ def execute_command(testbed, ssh_config, command, time_out=3): return o -def copy_file_to_testbed(testbed, ssh_config, text, file_name): +def write_text_to_file(testbed, ssh_config, text, file_name): """ Write a string to a given remote file. Overwrite the complete file if it already exists! @@ -162,8 +162,8 @@ def copy_file_to_testbed(testbed, ssh_config, text, file_name): """ ssh_client = get_ssh_client() - if ssh_config.proxycommand is not None: - proxy = paramiko.ProxyCommand(ssh_config.proxycommand) + if ssh_config.proxy_command is not None: + proxy = paramiko.ProxyCommand(ssh_config.proxy_command) else: proxy = None @@ -193,10 +193,9 @@ def copy_file_to_testbed(testbed, ssh_config, text, file_name): logger.error(str(e)) -def copy_paths_to_testbed(testbed, ssh_config, paths, destination): +def copy_files_to_testbed(testbed, ssh_config, paths, destination): """ - Write a string to a given remote file. - Overwrite the complete file if it already exists! + Copies local files to a remote node. @param testbed: testbed info @param ssh_config: ssh config of the node @@ -205,8 +204,8 @@ def copy_paths_to_testbed(testbed, ssh_config, paths, destination): """ ssh_client = get_ssh_client() - if ssh_config.proxycommand is not None: - proxy = paramiko.ProxyCommand(ssh_config.proxycommand) + if ssh_config.proxy_command is not None: + proxy = paramiko.ProxyCommand(ssh_config.proxy_command) else: proxy = None @@ -237,17 +236,16 @@ def copy_paths_to_testbed(testbed, ssh_config, paths, destination): logger.error(str(e)) -def copy_path_to_testbed(testbed, ssh_config, path, destination): +def copy_file_to_testbed(testbed, ssh_config, path, destination): """ - Write a string to a given remote file. - Overwrite the complete file if it already exists! + Copies a local file to a remote node. @param testbed: testbed info @param ssh_config: ssh config of the node @param path: source path (local) @param destination: destination folder name (remote) """ - copy_paths_to_testbed(testbed, ssh_config, [path], destination) + copy_files_to_testbed(testbed, ssh_config, [path], destination) def setup_vlan(testbed, node, vlan_id, int_name): diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py index e7458bc..53eebe5 100644 --- a/rumba/testbeds/emulab.py +++ b/rumba/testbeds/emulab.py @@ -131,7 +131,7 @@ class Testbed(mod.Testbed): ns = self.generate_ns_script(experiment) dest_file_name = '/users/' + self.username + \ '/temp_ns_file.%s.ns' % os.getpid() - ssh.copy_file_to_testbed(self, self.ops_ssh_config, ns, dest_file_name) + ssh.write_text_to_file(self, self.ops_ssh_config, ns, dest_file_name) cmd = '/usr/testbed/bin/sslxmlrpc_client.py startexp ' + \ 'batch=false wait=true proj="' + proj_name + \ @@ -210,6 +210,8 @@ class Testbed(mod.Testbed): for node in experiment.nodes: node.ssh_config.hostname = self.full_name(node.name) + node.ssh_config.set_username(self.username) + node.ssh_config.set_password(self.password) cmd = 'cat /var/emulab/boot/topomap' topomap = ssh.execute_command(self, experiment.nodes[0].ssh_config, cmd) diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index 9c72ca7..8867dc6 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -155,11 +155,13 @@ class Testbed(mod.Testbed): node.ssh_config.hostname = \ node.name + "." + self.exp_name + "." + \ auth_name_r + "." + self.auth_name - node.ssh_config.proxycommand = "ssh -i '" + self.cert_file + \ - "' -o StrictHostKeyChecking=no " + \ - self.username + \ - "@bastion.test.iminds.be nc " + \ - node.ssh_config.hostname + " 22" + node.ssh_config.proxy_command = "ssh -i '" + self.cert_file + \ + "' -o StrictHostKeyChecking=no " + \ + self.username + \ + "@bastion.test.iminds.be nc " + \ + node.ssh_config.hostname + " 22" + node.ssh_config.username = self.username + node.ssh_config.password = self.password subprocess.call(["java", "-jar", self.jfed_jar, "create", "-S", self.proj_name, "--rspec", @@ -174,8 +176,6 @@ class Testbed(mod.Testbed): rspec = xml.parse(self.manifest) xml_nodes = rspec.getElementsByTagName("node") - dir_path = os.path.dirname(os.path.abspath(__file__)) - # Complete details of the nodes after swapin logger.info("Sleeping for two seconds to avoid contacting jfed nodes " "too soon.") diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index f0b73a8..3d30ce2 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -254,6 +254,8 @@ class Testbed(mod.Testbed): vm['id'] = vmid node.ssh_config.hostname = "localhost" node.ssh_config.port = fwdp + node.ssh_config.username = self.username + node.ssh_config.password = self.password log_file = os.path.join(mod.tmp_dir, name + '.log') vars_dict = {'fwdp': fwdp, 'id': vmid, 'mac': mac, @@ -1,10 +1,9 @@ #!/usr/bin/env python -from setuptools import setup -from codecs import open -from os import path +import setuptools -setup( + +setuptools.setup( name="Rumba", version="0.4", url="https://gitlab.com/arcfire/rumba", @@ -14,6 +13,7 @@ setup( license="LGPL", description="Rumba measurement framework for RINA", packages=["rumba", "rumba.testbeds", "rumba.prototypes"], - install_requires=["paramiko", "wheel", "wget"], - scripts = ['tools/rumba-access'] + install_requires=["paramiko", "wget", 'repoze.lru; python_version<"3.2"'], + extras_require={"NumpyAcceleration": ["numpy"]}, + scripts=['tools/rumba-access'] ) |