aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2018-01-10 12:41:25 +0100
committerMarco Capitani <m.capitani@nextworks.it>2018-01-11 16:47:32 +0100
commit658a41da85783d0ea06b91e5c72f755144b2e449 (patch)
treeaeee1f589d6f3e7999b5c3c9d2f990ef802cc905
parentbb466208e238be10a0ca42e2db3f9edbb04e7bb8 (diff)
downloadrumba-658a41da85783d0ea06b91e5c72f755144b2e449.tar.gz
rumba-658a41da85783d0ea06b91e5c72f755144b2e449.zip
storyboard: re-add run_command method
Also: logging fix in ssh_support, Enabling changes: storyboard: refactor 'start' method, it was getting big some more minor storyboard refactoring for decoupling update examples to new syntax
-rwxr-xr-xexamples/converged-operator-network.py2
-rwxr-xr-xexamples/example.py2
-rwxr-xr-xexamples/two-layers.py2
-rw-r--r--rumba/log.py15
-rw-r--r--rumba/ssh_support.py4
-rw-r--r--rumba/storyboard.py120
-rw-r--r--rumba/utils.py29
7 files changed, 118 insertions, 56 deletions
diff --git a/examples/converged-operator-network.py b/examples/converged-operator-network.py
index 57609fc..d0da907 100755
--- a/examples/converged-operator-network.py
+++ b/examples/converged-operator-network.py
@@ -198,5 +198,5 @@ with ExperimentManager(exp):
c1 = Client("rinaperf", options ="-i 10000 -s 1000 -c 0 -d overlay")
s1 = Server("rinaperf", arrival_rate=2, mean_duration=5,
options = "-l -d overlay", nodes = [c1n1], clients = [c1])
- sb = StoryBoard(exp, duration=3600, servers = [s1])
+ sb = StoryBoard(experiment=exp, duration=3600, servers = [s1])
sb.start()
diff --git a/examples/example.py b/examples/example.py
index a26ab3b..eee966b 100755
--- a/examples/example.py
+++ b/examples/example.py
@@ -66,7 +66,7 @@ server = Server("rinaperf", options="-l", arrival_rate=0.5,
# (This can be done anytime before storyboard.start())
storyboard.set_experiment(exp)
-storyboard.add_server((server, a))
+storyboard.add_server_on_node(server, a)
client1.add_node(b)
client2.add_node(b)
diff --git a/examples/two-layers.py b/examples/two-layers.py
index d2e4007..c375088 100755
--- a/examples/two-layers.py
+++ b/examples/two-layers.py
@@ -58,7 +58,7 @@ print(exp)
with ExperimentManager(exp):
exp.swap_in()
exp.bootstrap_prototype()
- sb = StoryBoard(exp, duration=15, servers=[])
+ sb = StoryBoard(experiment=exp, duration=15, servers=[])
sb.run_command(7.5, a, 'echo "7.5 secs in. We are at $(hostname)"')
sb.run_command(12, b, 'echo "12 secs in. We are at $(hostname)"')
sb.start()
diff --git a/rumba/log.py b/rumba/log.py
index eb6be03..893f5f7 100644
--- a/rumba/log.py
+++ b/rumba/log.py
@@ -42,6 +42,8 @@ loggers_set = set()
mq = multiprocessing.Queue()
+logging_listener = None
+
try:
from logging.handlers import QueueHandler
@@ -201,6 +203,8 @@ def setup():
formatter = RumbaFormatter()
handler.setFormatter(formatter)
listener = QueueListener(mq, handler)
+ global logging_listener
+ logging_listener = listener
listener.start()
@@ -271,3 +275,14 @@ def reset_logging_level():
logger.setLevel(logging.NOTSET)
set_logging_level(logging.INFO)
set_logging_level(logging.ERROR, '')
+
+
+def flush_and_kill_logging():
+ """
+ Flushes all queued log messages and stops the logging facility.
+ Since the logging is done by a daemon thread, log entries might be lost
+ if execution is interrupted abruptly. Call this method to make sure
+ this does not happen.
+ """
+ global logging_listener
+ logging_listener.stop()
diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py
index 895a4e1..1ffa655 100644
--- a/rumba/ssh_support.py
+++ b/rumba/ssh_support.py
@@ -329,11 +329,11 @@ def copy_files_from_testbed(testbed, ssh_config, paths, destination):
for path in paths:
file_name = os.path.basename(path)
dest_file = destination + file_name
- logger.debug("Copying %s to %s@%s:%s path %s" % (
- path,
+ logger.debug("Copying %s@%s:%s path %s to %s" % (
testbed.username,
ssh_config.hostname,
ssh_config.port,
+ path,
dest_file))
sftp_client.get(path, dest_file)
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index 200b302..bbb1946 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -99,7 +99,7 @@ class Client(object):
#
class ClientProcess(object):
def __init__(self, client_id, ap, startup, duration,
- node=None, shutdown="<kill <pid>"):
+ node, shutdown="<kill <pid>"):
self.id = client_id
self.ap = ap
self.startup = startup
@@ -110,9 +110,7 @@ class ClientProcess(object):
self.pid = None
self.shutdown = shutdown
- def run(self, node=None):
- if node is not None:
- self.node = node
+ def run(self):
if self.node is None:
raise Exception('No node specified for client %s' % (self.ap,))
self.start_time = time.time()
@@ -154,7 +152,7 @@ class ClientProcess(object):
self.ap, self.node.name
)
- def check(self):
+ def kill_check(self):
"""Check if the process should keep running, stop it if not,
and return true if and only if it is still running."""
now = time.time()
@@ -274,7 +272,7 @@ class StoryBoard:
DEFAULT_INTERVAL = 2.5 # in seconds (may be a float)
- def __init__(self, experiment, duration, servers=None):
+ def __init__(self, duration, experiment=None, servers=None):
self.experiment = experiment
self.duration = duration
self.servers = list()
@@ -282,29 +280,32 @@ class StoryBoard:
servers = list()
for s in servers:
self._validate_and_add_server(s)
- self.client_nodes = [c for c in experiment.nodes if c.client]
+ self.client_nodes = set()
+ self.server_nodes = set()
self.active_clients = []
self.start_time = None
self.commands_list = {}
- def _validate_and_add_server(self, s):
- if self.experiment is None:
- raise ValueError("Cannot add a server before "
- "setting the experiment.")
- if hasattr(s, '__len__') and len(s) == 2:
- server, node = s
- if not isinstance(server, Server) \
- or not isinstance(node, model.Node):
- raise TypeError('First element must be of "Server" type, '
- 'second must be of "Node" type.')
- server.add_node(node)
- self.servers.append(server)
- elif type(s) == Server:
+ def _build_nodes_lists(self):
+ """Populates server_nodes and client_nodes lists"""
+ for server in self.servers:
+ for node in server.nodes:
+ self.server_nodes.add(node)
+ for client in server.clients:
+ for node in client.nodes:
+ self.client_nodes.add(node)
+
+ def _validate_and_add_server(self, s, n=None):
+ if len(s.clients) == 0:
+ logger.warning("'%s' server has no registered clients.", s.ap)
+ if n is not None:
+ s.add_node(n)
self.servers.append(s)
else:
- raise TypeError('Input servers should be either an object of '
- '"Server" type or a Server-Node couple.')
- for node in self.servers[-1].nodes:
+ if len(s.nodes) == 0:
+ logger.warning("'%s' server has no registered nodes.", s.ap)
+ self.servers.append(s)
+ for node in s.nodes:
if node not in self.experiment.nodes:
raise ValueError('Cannot run server on node %s, '
'not in experiment.' % (node.name,))
@@ -315,8 +316,28 @@ class StoryBoard:
self.experiment = experiment
def add_server(self, server):
+ """Register a server node to the sb."""
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
+ if not isinstance(server, Server):
+ raise TypeError('Argument must be of type Server')
self._validate_and_add_server(server)
+ def add_server_on_node(self, server, node):
+ """
+ Utility method to simultaneously add a server to a sb
+ and a node to the server.
+ """
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
+ if not isinstance(server, Server):
+ raise TypeError('First argument must be of type Server')
+ if not isinstance(node, model.Node):
+ raise TypeError('Second argument must be of type Server')
+ self._validate_and_add_server(server, node)
+
def del_server(self, server):
self.servers.remove(server)
@@ -329,6 +350,12 @@ class StoryBoard:
:param node: (Node) the node on which the command should be run
:param command: (str or list[str]) the command(s) to be run,
"""
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
+ if node not in self.experiment.nodes:
+ raise ValueError('Cannot run command on node %s, '
+ 'not in experiment.' % (node.name,))
if isinstance(command, str):
self.commands_list.setdefault(t, []).append((node, command))
else: # Hope it's an Iterable[str]. Otherwise, errors will happen.
@@ -340,12 +367,23 @@ class StoryBoard:
for server in self.servers:
clients = server.get_new_clients(self.DEFAULT_INTERVAL)
for new_client in clients:
- client_node = random.choice(self.client_nodes)
- new_client.run(client_node)
+ new_client.duration = min(
+ new_client.duration,
+ self.duration - (time.time() - self.start_time)
+ )
+ # Make sure the duration of the client does not
+ # go beyond the storyboard lifetime
+ if new_client.duration < server.min_duration:
+ continue
+ # Do not start clients that would not run for
+ # at least the minimum duration
+ # (due to sb constraints)
+ new_client.run()
self.active_clients.append(new_client)
- surviving = []
# Kill expired clients
+ surviving = []
+
for x in self.active_clients:
if x.kill_check(): #
surviving.append(x)
@@ -354,7 +392,7 @@ class StoryBoard:
# Do run_command instructions
unexpired_commands = {}
for t in self.commands_list:
- if time.time() - self.start_time > t:
+ if (time.time() - self.start_time) > t:
for node, command in self.commands_list[t]:
node.execute_command(command)
else:
@@ -362,7 +400,17 @@ class StoryBoard:
self.commands_list = unexpired_commands
def start(self):
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
logger.info('Starting storyboard execution')
+ self._build_nodes_lists()
+ logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes])
+ logger.debug('Client nodes are: %s.', [x.name for x in self.client_nodes])
+ logger.debug('Command list is: %s.', {x: [(y.name, z)
+ for y, z in t]
+ for (x, t)
+ in self.commands_list.items()})
self.start_time = time.time()
script = r'nohup "$@" > /dev/null & echo "$!"'
for node in self.client_nodes:
@@ -389,20 +437,14 @@ class StoryBoard:
if not os.path.isdir(local_dir):
raise Exception('"%s" is not a directory. Cannot fetch logs.'
% local_dir)
- server_nodes = set()
- client_nodes = set()
- for server in self.servers:
- for node in server.nodes:
- server_nodes.add(node)
- for client in server.clients:
- for node in client.nodes:
- client_nodes.add(node)
- for node in server_nodes:
+ for node in self.server_nodes:
logs_list = node.execute_command('ls *_server.log')
+ logs_list = [x for x in logs_list.split('\n') if x != '']
logger.info('Log list is:\n%s', logs_list)
- node.fetch_files(logs_list.split('\n'), local_dir)
- for node in client_nodes:
+ node.fetch_files(logs_list, local_dir)
+ for node in self.client_nodes:
logs_list = node.execute_command('ls /tmp/*.rumba.log '
'|| echo ""')
+ logs_list = [x for x in logs_list.split('\n') if x != '']
logger.info('Log list is:\n%s', logs_list)
- node.fetch_files(logs_list.split('\n'), local_dir)
+ node.fetch_files(logs_list, local_dir)
diff --git a/rumba/utils.py b/rumba/utils.py
index 7417074..2a8c6b7 100644
--- a/rumba/utils.py
+++ b/rumba/utils.py
@@ -54,15 +54,20 @@ class ExperimentManager(object):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
- if self.do_swap_out == self.PROMPT:
- logger.info('Press ENTER to start swap out.')
- input('')
- if self.do_swap_out == self.PROMPT \
- or self.do_swap_out == self.AUTO:
- self.experiment.swap_out()
- if exc_val is not None:
- logger.error('Something went wrong. Got %s: %s',
- type(exc_val).__name__, str(exc_val))
- logger.debug('Exception details:', exc_info=exc_val)
- time.sleep(0.1) # Give the queue logger enough time to flush.
- return True # Suppress the exception we logged: no traceback.
+ try:
+ if self.do_swap_out == self.PROMPT:
+ logger.info('Press ENTER to start swap out.')
+ input('')
+ if self.do_swap_out == self.PROMPT \
+ or self.do_swap_out == self.AUTO:
+ self.experiment.swap_out()
+ if exc_val is not None:
+ logger.error('Something went wrong. Got %s: %s',
+ type(exc_val).__name__, str(exc_val))
+ logger.debug('Exception details:', exc_info=exc_val)
+ finally:
+ log.flush_and_kill_logging()
+ # Make sure to print all logs before execution terminates,
+ # Specifically the last two error logs above.
+ return True
+ # Suppress the exception we logged: no traceback, unless requested.