aboutsummaryrefslogtreecommitdiff
path: root/rumba/storyboard.py
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2018-01-10 12:41:25 +0100
committerMarco Capitani <m.capitani@nextworks.it>2018-01-11 16:47:32 +0100
commit658a41da85783d0ea06b91e5c72f755144b2e449 (patch)
treeaeee1f589d6f3e7999b5c3c9d2f990ef802cc905 /rumba/storyboard.py
parentbb466208e238be10a0ca42e2db3f9edbb04e7bb8 (diff)
downloadrumba-658a41da85783d0ea06b91e5c72f755144b2e449.tar.gz
rumba-658a41da85783d0ea06b91e5c72f755144b2e449.zip
storyboard: re-add run_command method
Also: logging fix in ssh_support, Enabling changes: storyboard: refactor 'start' method, it was getting big some more minor storyboard refactoring for decoupling update examples to new syntax
Diffstat (limited to 'rumba/storyboard.py')
-rw-r--r--rumba/storyboard.py120
1 files changed, 81 insertions, 39 deletions
diff --git a/rumba/storyboard.py b/rumba/storyboard.py
index 200b302..bbb1946 100644
--- a/rumba/storyboard.py
+++ b/rumba/storyboard.py
@@ -99,7 +99,7 @@ class Client(object):
#
class ClientProcess(object):
def __init__(self, client_id, ap, startup, duration,
- node=None, shutdown="<kill <pid>"):
+ node, shutdown="<kill <pid>"):
self.id = client_id
self.ap = ap
self.startup = startup
@@ -110,9 +110,7 @@ class ClientProcess(object):
self.pid = None
self.shutdown = shutdown
- def run(self, node=None):
- if node is not None:
- self.node = node
+ def run(self):
if self.node is None:
raise Exception('No node specified for client %s' % (self.ap,))
self.start_time = time.time()
@@ -154,7 +152,7 @@ class ClientProcess(object):
self.ap, self.node.name
)
- def check(self):
+ def kill_check(self):
"""Check if the process should keep running, stop it if not,
and return true if and only if it is still running."""
now = time.time()
@@ -274,7 +272,7 @@ class StoryBoard:
DEFAULT_INTERVAL = 2.5 # in seconds (may be a float)
- def __init__(self, experiment, duration, servers=None):
+ def __init__(self, duration, experiment=None, servers=None):
self.experiment = experiment
self.duration = duration
self.servers = list()
@@ -282,29 +280,32 @@ class StoryBoard:
servers = list()
for s in servers:
self._validate_and_add_server(s)
- self.client_nodes = [c for c in experiment.nodes if c.client]
+ self.client_nodes = set()
+ self.server_nodes = set()
self.active_clients = []
self.start_time = None
self.commands_list = {}
- def _validate_and_add_server(self, s):
- if self.experiment is None:
- raise ValueError("Cannot add a server before "
- "setting the experiment.")
- if hasattr(s, '__len__') and len(s) == 2:
- server, node = s
- if not isinstance(server, Server) \
- or not isinstance(node, model.Node):
- raise TypeError('First element must be of "Server" type, '
- 'second must be of "Node" type.')
- server.add_node(node)
- self.servers.append(server)
- elif type(s) == Server:
+ def _build_nodes_lists(self):
+ """Populates server_nodes and client_nodes lists"""
+ for server in self.servers:
+ for node in server.nodes:
+ self.server_nodes.add(node)
+ for client in server.clients:
+ for node in client.nodes:
+ self.client_nodes.add(node)
+
+ def _validate_and_add_server(self, s, n=None):
+ if len(s.clients) == 0:
+ logger.warning("'%s' server has no registered clients.", s.ap)
+ if n is not None:
+ s.add_node(n)
self.servers.append(s)
else:
- raise TypeError('Input servers should be either an object of '
- '"Server" type or a Server-Node couple.')
- for node in self.servers[-1].nodes:
+ if len(s.nodes) == 0:
+ logger.warning("'%s' server has no registered nodes.", s.ap)
+ self.servers.append(s)
+ for node in s.nodes:
if node not in self.experiment.nodes:
raise ValueError('Cannot run server on node %s, '
'not in experiment.' % (node.name,))
@@ -315,8 +316,28 @@ class StoryBoard:
self.experiment = experiment
def add_server(self, server):
+ """Register a server node to the sb."""
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
+ if not isinstance(server, Server):
+ raise TypeError('Argument must be of type Server')
self._validate_and_add_server(server)
+ def add_server_on_node(self, server, node):
+ """
+ Utility method to simultaneously add a server to a sb
+ and a node to the server.
+ """
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
+ if not isinstance(server, Server):
+ raise TypeError('First argument must be of type Server')
+ if not isinstance(node, model.Node):
+ raise TypeError('Second argument must be of type Server')
+ self._validate_and_add_server(server, node)
+
def del_server(self, server):
self.servers.remove(server)
@@ -329,6 +350,12 @@ class StoryBoard:
:param node: (Node) the node on which the command should be run
:param command: (str or list[str]) the command(s) to be run,
"""
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
+ if node not in self.experiment.nodes:
+ raise ValueError('Cannot run command on node %s, '
+ 'not in experiment.' % (node.name,))
if isinstance(command, str):
self.commands_list.setdefault(t, []).append((node, command))
else: # Hope it's an Iterable[str]. Otherwise, errors will happen.
@@ -340,12 +367,23 @@ class StoryBoard:
for server in self.servers:
clients = server.get_new_clients(self.DEFAULT_INTERVAL)
for new_client in clients:
- client_node = random.choice(self.client_nodes)
- new_client.run(client_node)
+ new_client.duration = min(
+ new_client.duration,
+ self.duration - (time.time() - self.start_time)
+ )
+ # Make sure the duration of the client does not
+ # go beyond the storyboard lifetime
+ if new_client.duration < server.min_duration:
+ continue
+ # Do not start clients that would not run for
+ # at least the minimum duration
+ # (due to sb constraints)
+ new_client.run()
self.active_clients.append(new_client)
- surviving = []
# Kill expired clients
+ surviving = []
+
for x in self.active_clients:
if x.kill_check(): #
surviving.append(x)
@@ -354,7 +392,7 @@ class StoryBoard:
# Do run_command instructions
unexpired_commands = {}
for t in self.commands_list:
- if time.time() - self.start_time > t:
+ if (time.time() - self.start_time) > t:
for node, command in self.commands_list[t]:
node.execute_command(command)
else:
@@ -362,7 +400,17 @@ class StoryBoard:
self.commands_list = unexpired_commands
def start(self):
+ if self.experiment is None:
+ raise ValueError("Cannot add a server before "
+ "setting the experiment.")
logger.info('Starting storyboard execution')
+ self._build_nodes_lists()
+ logger.debug('Server nodes are: %s.', [x.name for x in self.server_nodes])
+ logger.debug('Client nodes are: %s.', [x.name for x in self.client_nodes])
+ logger.debug('Command list is: %s.', {x: [(y.name, z)
+ for y, z in t]
+ for (x, t)
+ in self.commands_list.items()})
self.start_time = time.time()
script = r'nohup "$@" > /dev/null & echo "$!"'
for node in self.client_nodes:
@@ -389,20 +437,14 @@ class StoryBoard:
if not os.path.isdir(local_dir):
raise Exception('"%s" is not a directory. Cannot fetch logs.'
% local_dir)
- server_nodes = set()
- client_nodes = set()
- for server in self.servers:
- for node in server.nodes:
- server_nodes.add(node)
- for client in server.clients:
- for node in client.nodes:
- client_nodes.add(node)
- for node in server_nodes:
+ for node in self.server_nodes:
logs_list = node.execute_command('ls *_server.log')
+ logs_list = [x for x in logs_list.split('\n') if x != '']
logger.info('Log list is:\n%s', logs_list)
- node.fetch_files(logs_list.split('\n'), local_dir)
- for node in client_nodes:
+ node.fetch_files(logs_list, local_dir)
+ for node in self.client_nodes:
logs_list = node.execute_command('ls /tmp/*.rumba.log '
'|| echo ""')
+ logs_list = [x for x in logs_list.split('\n') if x != '']
logger.info('Log list is:\n%s', logs_list)
- node.fetch_files(logs_list.split('\n'), local_dir)
+ node.fetch_files(logs_list, local_dir)