From 815839bf3cac2fcfd2d25a69395055397d55a8bb Mon Sep 17 00:00:00 2001 From: Marco Capitani Date: Fri, 30 Jun 2017 12:17:16 +0200 Subject: ssh & model-storyboard: changed ssh API, added node.execute* methods --- rumba/model.py | 123 ++++++++++++++++++++++++++++++++++------------- rumba/recpoisson.py | 65 +++++++++++++++++++++++++ rumba/ssh_support.py | 12 ++--- rumba/testbeds/emulab.py | 2 + rumba/testbeds/jfed.py | 14 +++--- rumba/testbeds/qemu.py | 2 + 6 files changed, 172 insertions(+), 46 deletions(-) create mode 100644 rumba/recpoisson.py (limited to 'rumba') diff --git a/rumba/model.py b/rumba/model.py index ce4e938..d2f7567 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -164,10 +164,18 @@ class NormalDIF(DIF): # SSH Configuration # class SSHConfig: - def __init__(self, hostname, port=22, proxycommand=None): + def __init__(self, hostname, port=22, proxy_command=None): + self.username = None + self.password = None self.hostname = hostname self.port = port - self.proxycommand = proxycommand + self.proxy_command = proxy_command + + def set_username(self, username): + self.username = username + + def set_password(self, password): + self.password = password # A node in the experiment @@ -284,6 +292,74 @@ class Node: def get_policy(self, dif): return self.policies[dif] + def execute_commands(self, commands, time_out=3, use_proxy=False): + # Ssh_config is used twice since it doubles as testbed info + # (it holds fields username and password) + if use_proxy: + return ssh_support.execute_proxy_commands( + self.ssh_config, + self.ssh_config, + commands, + time_out + ) + # else: + return ssh_support.execute_commands( + self.ssh_config, + self.ssh_config, + commands, + time_out + ) + + def execute_command(self, command, time_out=3, use_proxy=False): + # Ssh_config is used twice since it doubles as testbed info + # (it holds fields username and password) + if use_proxy: + return ssh_support.execute_proxy_command( + self.ssh_config, + self.ssh_config, + command, + time_out + ) + # else: + return ssh_support.execute_command( + self.ssh_config, + self.ssh_config, + command, + time_out + ) + + def copy_file_to_testbed(self, text, file_name): + ssh_support.copy_file_to_testbed( + self.ssh_config, + self.ssh_config, + text, + file_name + ) + + def copy_path_to_testbed(self, path, destination): + ssh_support.copy_path_to_testbed( + self.ssh_config, + self.ssh_config, + path, + destination + ) + + def copy_paths_to_testbed(self, paths, destination): + ssh_support.copy_paths_to_testbed( + self.ssh_config, + self.ssh_config, + paths, + destination + ) + + def setup_vlan(self, vlan_id, int_name): + ssh_support.setup_vlan( + self.ssh_config, + self.ssh_config, + vlan_id, + int_name + ) + # Base class representing an IPC Process to be created in the experiment # @@ -649,7 +725,7 @@ class Experiment: self.testbed.username, node.ssh_config.hostname, node.ssh_config.port, - node.ssh_config.proxycommand)) + node.ssh_config.proxy_command)) f.close() # Examine the nodes and DIFs, compute the registration and enrollment @@ -690,26 +766,24 @@ class Experiment: # @options: Options to pass to the binary # class Client(object): - def __init__(self, ap, testbed, options=None): + def __init__(self, ap, options=None): self.ap = ap self.options = options - self.testbed = testbed def start_process(self, duration): - return ClientProcess(self.ap, duration, self.testbed, self.options) + return ClientProcess(self.ap, duration, self.options) # Base class for client processes # # @ap: Application Process binary -# @node: The node on which this process should run # @duration: The time (in seconds) this process should run # @start_time: The time at which this process is started. # @options: Options to pass to the binary # class ClientProcess(Client): - def __init__(self, ap, duration, testbed, options=None): - super(ClientProcess, self).__init__(ap, testbed, options=options) + def __init__(self, ap, duration, options=None): + super(ClientProcess, self).__init__(ap, options=options) self.duration = duration self.start_time = None self.running = False @@ -728,20 +802,14 @@ class ClientProcess(Client): opt_str = self.options if self.options is not None else "" cmd = "./startup.sh %s %s" % (self.ap, opt_str) self.running = True - self.pid = ssh_support.execute_command(self.testbed, - self.node.ssh_config, - cmd) + self.pid = self.node.execute_command(cmd) def stop(self): logger.debug( 'Killing client %s on node %s.', self.ap, self.node.name ) - ssh_support.execute_command( - self.testbed, - self.node.ssh_config, - "kill %s" % self.pid - ) + self.node.execute_command("kill %s" % self.pid) def check(self): """Check if the process should keep running, stop it if not, @@ -760,14 +828,13 @@ class ClientProcess(Client): # @ap: Application Process binary # @arrival_rate: Average requests/s to be received by this server # @mean_duration: Average duration of a client connection (in seconds) -# @testbed: the testbed for the experiment # @options: Options to pass to the binary # @max_clients: Maximum number of clients to serve # @clients: Client binaries that will use this server # @nodes: Specific nodes to start this server on # class Server: - def __init__(self, ap, arrival_rate, mean_duration, testbed, + def __init__(self, ap, arrival_rate, mean_duration, options=None, max_clients=float('inf'), clients=None, nodes=None): self.ap = ap @@ -779,7 +846,6 @@ class Server: self.nodes = nodes self.arrival_rate = arrival_rate # mean requests/s self.mean_duration = mean_duration # in seconds - self.testbed = testbed self.pids = {} def add_client(self, client): @@ -828,9 +894,7 @@ class Server: 'Starting server %s on node %s with logfile %s.', self.ap, node.name, logfile ) - self.pids[node] = (ssh_support.execute_commands(self.testbed, - node.ssh_config, - cmds)) + self.pids[node] = (node.execute_commands(cmds)) def stop(self): for node, pid in self.pids.items(): @@ -838,11 +902,7 @@ class Server: 'Killing server %s on node %s.', self.ap, node.name ) - ssh_support.execute_command( - self.testbed, - node.ssh_config, - "kill %s" % pid - ) + node.execute_command("kill %s" % pid) # Base class for ARCFIRE storyboards @@ -856,7 +916,7 @@ class StoryBoard: DEFAULT_INTERVAL = 2.5 # in seconds (may be a float) - def __init__(self, experiment, duration, testbed, servers=None): + def __init__(self, experiment, duration, servers=None): self.experiment = experiment self.duration = duration if servers is None: @@ -865,7 +925,6 @@ class StoryBoard: self.client_nodes = [c for c in experiment.nodes if c.client] self.active_clients = [] self.start_time = None - self.testbed = testbed def add_server(self, server): self.servers.append(server) @@ -878,9 +937,7 @@ class StoryBoard: script = r'nohup "$@" > /dev/null & echo "$!"' for node in self.client_nodes: logger.debug("Writing utility startup script on client nodes.") - ssh_support.execute_command( - self.testbed, - node.ssh_config, + node.execute_command( "echo '%s' > startup.sh && chmod a+x startup.sh" % (script,) ) try: diff --git a/rumba/recpoisson.py b/rumba/recpoisson.py new file mode 100644 index 0000000..3c1e6fe --- /dev/null +++ b/rumba/recpoisson.py @@ -0,0 +1,65 @@ +import math +import random + +import sys + +if sys.version_info < (3, 2): + from repoze.lru import lru_cache + # from functools32 import lru_cache +else: + from functools import lru_cache + + +@lru_cache(1000) +def _get_poisson_var(parameter): + return Poisson(parameter) + + +class Poisson(object): + + def __init__(self, parameter): + self.parameter = parameter + + def c_p(k): + """Compute the Poisson CDF for k iteratively.""" + if k == 0: + return self._p(0) + else: + return self._compute_poisson_cdf(k - 1) + self._p(k) + self._compute_poisson_cdf = lru_cache(int(2.5*self.parameter) + 1)(c_p) + + @staticmethod + def _get_random(): + return random.random() + + def _p(self, k): + # l^k * e^-l / k! + # Computed as exp(klog(l) - l - log(k!)) + l = self.parameter + l_to_the_k = k * math.log(l) + k_fact = sum([math.log(i + 1) for i in range(k)]) + return math.exp(l_to_the_k - l - k_fact) + + def sample(self): + # The idea is: + # take a sample from U(0,1) and call it f. + # Let x be s.t. x = min_N F(x) > f + # where F is the cumulative distribution function of Poisson(parameter) + # return x + f = self._get_random() + + # We compute x iteratively by computing + # \sum_k(P=k) + # where P ~ Poisson(parameter) and stopping as soon as + # it is greater than f. + # We use the cache to store results. + current_cdf = -1 + current_x = -1 + while current_cdf < f: + current_x += 1 + current_cdf = self._compute_poisson_cdf(current_x) + return current_x + + +def poisson(parameter): + return _get_poisson_var(parameter).sample() diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py index b0970e1..e785f33 100644 --- a/rumba/ssh_support.py +++ b/rumba/ssh_support.py @@ -102,8 +102,8 @@ def execute_commands(testbed, ssh_config, commands, time_out=3): """ ssh_client = get_ssh_client() - if ssh_config.proxycommand is not None: - proxy = paramiko.ProxyCommand(ssh_config.proxycommand) + if ssh_config.proxy_command is not None: + proxy = paramiko.ProxyCommand(ssh_config.proxy_command) else: proxy = None @@ -162,8 +162,8 @@ def copy_file_to_testbed(testbed, ssh_config, text, file_name): """ ssh_client = get_ssh_client() - if ssh_config.proxycommand is not None: - proxy = paramiko.ProxyCommand(ssh_config.proxycommand) + if ssh_config.proxy_command is not None: + proxy = paramiko.ProxyCommand(ssh_config.proxy_command) else: proxy = None @@ -205,8 +205,8 @@ def copy_paths_to_testbed(testbed, ssh_config, paths, destination): """ ssh_client = get_ssh_client() - if ssh_config.proxycommand is not None: - proxy = paramiko.ProxyCommand(ssh_config.proxycommand) + if ssh_config.proxy_command is not None: + proxy = paramiko.ProxyCommand(ssh_config.proxy_command) else: proxy = None diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py index e7458bc..9b90e68 100644 --- a/rumba/testbeds/emulab.py +++ b/rumba/testbeds/emulab.py @@ -210,6 +210,8 @@ class Testbed(mod.Testbed): for node in experiment.nodes: node.ssh_config.hostname = self.full_name(node.name) + node.ssh_config.set_username(self.username) + node.ssh_config.set_password(self.password) cmd = 'cat /var/emulab/boot/topomap' topomap = ssh.execute_command(self, experiment.nodes[0].ssh_config, cmd) diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index e158048..ad6b98a 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -153,11 +153,13 @@ class Testbed(mod.Testbed): node.ssh_config.hostname = \ node.name + "." + self.exp_name + "." + \ auth_name_r + "." + self.auth_name - node.ssh_config.proxycommand = "ssh -i '" + self.cert_file + \ - "' -o StrictHostKeyChecking=no " + \ - self.username + \ - "@bastion.test.iminds.be nc " + \ - node.ssh_config.hostname + " 22" + node.ssh_config.proxy_command = "ssh -i '" + self.cert_file + \ + "' -o StrictHostKeyChecking=no " + \ + self.username + \ + "@bastion.test.iminds.be nc " + \ + node.ssh_config.hostname + " 22" + node.ssh_config.username = self.username + node.ssh_config.password = self.password subprocess.call(["java", "-jar", self.jfed_jar, "create", "-S", self.proj_name, "--rspec", @@ -172,8 +174,6 @@ class Testbed(mod.Testbed): rspec = xml.parse(self.manifest) xml_nodes = rspec.getElementsByTagName("node") - dir_path = os.path.dirname(os.path.abspath(__file__)) - # Complete details of the nodes after swapin logger.info("Sleeping for two seconds to avoid contacting jfed nodes " "too soon.") diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index 1d449dc..34458e2 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -255,6 +255,8 @@ class Testbed(mod.Testbed): vm['id'] = vmid node.ssh_config.hostname = "localhost" node.ssh_config.port = fwdp + node.ssh_config.username = self.username + node.ssh_config.password = self.password vars_dict = {'fwdp': fwdp, 'id': vmid, 'mac': mac, 'bzimage': self.bzimage, -- cgit v1.2.3