diff options
author | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2017-07-28 10:07:31 +0000 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@intec.ugent.be> | 2017-07-28 10:07:31 +0000 |
commit | f3f51c550ef9751bada8ffcd4d4e846c786a26e0 (patch) | |
tree | b6cddc451fc182e0bd3384805f4ae705375f043b /rumba/model.py | |
parent | 7b43f02b415968371f1b5719232bd7741cb7f12f (diff) | |
parent | 7dc3b5086d4c1d19c9ef315369f3fd7c4dd2889e (diff) | |
download | rumba-f3f51c550ef9751bada8ffcd4d4e846c786a26e0.tar.gz rumba-f3f51c550ef9751bada8ffcd4d4e846c786a26e0.zip |
Merge branch 'storyboard-impl' into 'master'
Storyboard impl
See merge request !61
Diffstat (limited to 'rumba/model.py')
-rw-r--r-- | rumba/model.py | 218 |
1 files changed, 194 insertions, 24 deletions
diff --git a/rumba/model.py b/rumba/model.py index f4f98d1..1adc3b0 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -20,13 +20,29 @@ # MA 02110-1301 USA import abc - -import rumba.log as log import os +import random import stat +import time + +import rumba.log as log +from rumba import ssh_support logger = log.get_logger(__name__) +try: + from numpy.random import poisson + from numpy.random import exponential + logger.debug("Using numpy for faster and better random variables.") +except ImportError: + from rumba.recpoisson import poisson + + def exponential(mean_duration): + return random.expovariate(1.0 / mean_duration) + + logger.debug("Falling back to simple implementations.") + # PROBLEM! These logs will almost never be printed... But we might not care + tmp_dir = '/tmp/rumba' try: os.mkdir(tmp_dir) @@ -42,6 +58,7 @@ except OSError: # Already there, nothing to do pass + # Represents generic testbed info # # @username [string] user name @@ -164,10 +181,18 @@ class NormalDIF(DIF): # SSH Configuration # class SSHConfig: - def __init__(self, hostname, port=22, proxycommand=None): + def __init__(self, hostname, port=22, proxy_command=None): + self.username = None + self.password = None self.hostname = hostname self.port = port - self.proxycommand = proxycommand + self.proxy_command = proxy_command + + def set_username(self, username): + self.username = username + + def set_password(self, password): + self.password = password # A node in the experiment @@ -282,6 +307,74 @@ class Node: def get_policy(self, dif): return self.policies[dif] + def execute_commands(self, commands, time_out=3, use_proxy=False): + # Ssh_config is used twice since it doubles as testbed info + # (it holds fields username and password) + if use_proxy: + return ssh_support.execute_proxy_commands( + self.ssh_config, + self.ssh_config, + commands, + time_out + ) + # else: + return ssh_support.execute_commands( + self.ssh_config, + self.ssh_config, + commands, + time_out + ) + + def execute_command(self, command, time_out=3, use_proxy=False): + # Ssh_config is used twice since it doubles as testbed info + # (it holds fields username and password) + if use_proxy: + return ssh_support.execute_proxy_command( + self.ssh_config, + self.ssh_config, + command, + time_out + ) + # else: + return ssh_support.execute_command( + self.ssh_config, + self.ssh_config, + command, + time_out + ) + + def write_text_to_file(self, text, file_name): + ssh_support.write_text_to_file( + self.ssh_config, + self.ssh_config, + text, + file_name + ) + + def copy_file(self, path, destination): + ssh_support.copy_file_to_testbed( + self.ssh_config, + self.ssh_config, + path, + destination + ) + + def copy_files(self, paths, destination): + ssh_support.copy_files_to_testbed( + self.ssh_config, + self.ssh_config, + paths, + destination + ) + + def setup_vlan(self, vlan_id, int_name): + ssh_support.setup_vlan( + self.ssh_config, + self.ssh_config, + vlan_id, + int_name + ) + # Base class representing an IPC Process to be created in the experiment # @@ -647,7 +740,7 @@ class Experiment: self.testbed.username, node.ssh_config.hostname, node.ssh_config.port, - node.ssh_config.proxycommand)) + node.ssh_config.proxy_command)) f.close() # Examine the nodes and DIFs, compute the registration and enrollment @@ -692,38 +785,57 @@ class Client(object): self.ap = ap self.options = options - def start_process(self, node, duration, start_time): - return ClientProcess(self.ap, node, duration, start_time, self.options) + def start_process(self, duration): + return ClientProcess(self.ap, duration, self.options) # Base class for client processes # # @ap: Application Process binary -# @node: The node on which this process should run # @duration: The time (in seconds) this process should run # @start_time: The time at which this process is started. # @options: Options to pass to the binary # class ClientProcess(Client): - def __init__(self, ap, node, duration, start_time, options=None): + def __init__(self, ap, duration, options=None): super(ClientProcess, self).__init__(ap, options=options) - self.node = node self.duration = duration - self.start_time = start_time - self.run() - self.running = True + self.start_time = None + self.running = False + self.node = None + self.pid = None - def run(self): - pass # TODO to be implemented + def run(self, node): + self.node = node + self.start_time = time.time() - def stop(self): - pass # TODO to be implemented + logger.debug( + 'Starting client app %s on node %s with duration %s.', + self.ap, self.node.name, self.duration + ) + + opt_str = self.options if self.options is not None else "" + cmd = "./startup.sh %s %s" % (self.ap, opt_str) + self.running = True + self.pid = self.node.execute_command(cmd) - def check(self, now): + def stop(self): + logger.debug( + 'Killing client %s on node %s.', + self.ap, self.node.name + ) + self.node.execute_command("kill %s" % self.pid) + + def check(self): + """Check if the process should keep running, stop it if not, + and return true if and only if it is still running.""" + now = time.time() if not self.running: - return + return False if now - self.start_time >= self.duration: self.stop() + return False + return True # Base class for server programs @@ -738,7 +850,7 @@ class ClientProcess(Client): # class Server: def __init__(self, ap, arrival_rate, mean_duration, - options=None, max_clients=None, + options=None, max_clients=float('inf'), clients=None, nodes=None): self.ap = ap self.options = options @@ -749,6 +861,7 @@ class Server: self.nodes = nodes self.arrival_rate = arrival_rate # mean requests/s self.mean_duration = mean_duration # in seconds + self.pids = {} def add_client(self, client): self.clients.append(client) @@ -770,13 +883,40 @@ class Server: interval seconds. Hence, the average size should be interval * arrival_rate. """ - pass + number = poisson(self.arrival_rate * interval) + number = int(min(number, self.max_clients)) + l = [self.make_client_process() for _ in range(number)] + return l def make_client_process(self): """Returns a client of this server""" if len(self.clients) == 0: - raise Exception("Server %s has empty client list," % (self,)) - pass # TODO should return a ClientProcess + raise Exception("Server %s has empty client list." % (self,)) + duration = exponential(self.mean_duration) + return random.choice(self.clients)\ + .start_process(duration=duration) + + def run(self): + for node in self.nodes: + opt_str = self.options if self.options is not None else "" + logfile = "%s.log" % self.ap + script = r'nohup "$@" > %s & echo "$!"' % logfile + cmds = ["echo '%s' > startup.sh && chmod a+x startup.sh" + % (script,), + "./startup.sh %s %s" % (self.ap, opt_str)] + logger.debug( + 'Starting server %s on node %s with logfile %s.', + self.ap, node.name, logfile + ) + self.pids[node] = (node.execute_commands(cmds)) + + def stop(self): + for node, pid in self.pids.items(): + logger.debug( + 'Killing server %s on node %s.', + self.ap, node.name + ) + node.execute_command("kill %s" % pid) # Base class for ARCFIRE storyboards @@ -786,12 +926,18 @@ class Server: # @servers: App servers available in the network # class StoryBoard: + + DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) + def __init__(self, experiment, duration, servers=None): self.experiment = experiment self.duration = duration if servers is None: servers = list() self.servers = servers + self.client_nodes = [c for c in experiment.nodes if c.client] + self.active_clients = [] + self.start_time = None def add_server(self, server): self.servers.append(server) @@ -800,4 +946,28 @@ class StoryBoard: self.servers.remove(server) def start(self): - pass + self.start_time = time.time() + script = r'nohup "$@" > /dev/null & echo "$!"' + for node in self.client_nodes: + logger.debug("Writing utility startup script on client nodes.") + node.execute_command( + "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) + ) + try: + for server in self.servers: + server.run() + while time.time() - self.start_time < self.duration: + for server in self.servers: + clients = server.get_new_clients(self.DEFAULT_INTERVAL) + for new_client in clients: + client_node = random.choice(self.client_nodes) + new_client.run(client_node) + self.active_clients.append(new_client) + self.active_clients = \ + [x for x in self.active_clients if x.check()] + time.sleep(self.DEFAULT_INTERVAL) + finally: + for client in self.active_clients: + client.stop() + for server in self.servers: + server.stop() |