aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorvmaffione <v.maffione@gmail.com>2017-04-08 10:58:48 +0000
committervmaffione <v.maffione@gmail.com>2017-04-08 10:58:48 +0000
commitae113f8d19eb29a0edb50bf790275414125f78ca (patch)
tree26fc3690f4b3bc7c7a5635ee31f0297bb66ead19
parent2666310aa2a25e5d66b669bde337711e7d22c904 (diff)
parent8d4ba82ac3a598992ee3d8fbe7ea14375b7c0801 (diff)
downloadrumba-ae113f8d19eb29a0edb50bf790275414125f78ca.tar.gz
rumba-ae113f8d19eb29a0edb50bf790275414125f78ca.zip
Merge branch 'vincenzo' into 'master'
rumba: simplify cooperation between prototype and testbed plugins See merge request !20
-rw-r--r--README10
-rwxr-xr-xexamples/example.py10
-rwxr-xr-xexamples/two-layers.py6
-rw-r--r--rumba/model.py26
-rw-r--r--rumba/prototypes/irati.py3
-rw-r--r--rumba/prototypes/ouroboros.py3
-rw-r--r--rumba/prototypes/rlite.py5
-rw-r--r--rumba/testbeds/emulab.py2
-rw-r--r--rumba/testbeds/faketestbed.py2
-rw-r--r--rumba/testbeds/jfed.py2
-rw-r--r--rumba/testbeds/qemu.py90
11 files changed, 86 insertions, 73 deletions
diff --git a/README b/README
index cb29f09..8471a93 100644
--- a/README
+++ b/README
@@ -16,15 +16,13 @@ Workflow, both external and internal:
used by the plugins
(4) user calls run() on the prototype.Experiment instance:
- - First, run() calls Experiment.swap_in(), which
- in turns calls Testbed.create_experiment(), passing the
- nodes and links (?)
- TODO: fix this interface: what should swap_in(), and
- so create_experiment() return exactly? Current interface
- seems broken
+ - First, run() calls Testbed.swap_in(), passing the
+ Experiment, and filling in the missing information
- Second, run() calls a prototype-specific setup function,
to create the required IPCPs, perform registrations,
enrollments, etc.
- Third, perform tests (TODO)
+
+ - Fourth, run() calls Testbed.swap_out()
diff --git a/examples/example.py b/examples/example.py
index e173ef0..0d8fcd9 100755
--- a/examples/example.py
+++ b/examples/example.py
@@ -30,12 +30,12 @@ b = Node("b",
difs = [e1, n1],
dif_registrations = {n1 : [e1]})
-tb = jfed.Testbed(exp_name = "letest",
- username = "sander",
- cert_file = "cert.pem",
- jfed_jar = "jfed_cli/experimenter-cli.jar")
+tb = qemu.Testbed(exp_name = "example1",
+ username = "vmaffione",
+ bzimage = '/home/vmaffione/git/rlite/demo/buildroot/bzImage',
+ initramfs = '/home/vmaffione/git/rlite/demo/buildroot/rootfs.cpio')
-exp = irati.Experiment(tb, nodes = [a, b])
+exp = rl.Experiment(tb, nodes = [a, b])
print(exp)
diff --git a/examples/two-layers.py b/examples/two-layers.py
index 29faebe..687c99f 100755
--- a/examples/two-layers.py
+++ b/examples/two-layers.py
@@ -43,9 +43,11 @@ d = Node("d",
dif_registrations = {n3: [n2], n2 : [e3]})
tb = qemu.Testbed(exp_name = "twolayers",
- username = "vmaffio")
+ username = "vmaffione",
+ bzimage = '/home/vmaffione/git/rlite/demo/buildroot/bzImage',
+ initramfs = '/home/vmaffione/git/rlite/demo/buildroot/rootfs.cpio')
-exp = irati.Experiment(tb, nodes = [a, b, c, d])
+exp = rl.Experiment(tb, nodes = [a, b, c, d])
print(exp)
diff --git a/rumba/model.py b/rumba/model.py
index 25a1356..99bf5ed 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -37,8 +37,13 @@ class Testbed:
self.exp_name = exp_name
@abc.abstractmethod
- def create_experiment(self, experiment):
- raise Exception('create_experiment() not implemented')
+ def swap_in(self, experiment):
+ raise Exception('swap_in() not implemented')
+
+ @abc.abstractmethod
+ def swap_out(self, experiment):
+ print("swap_out(): nothing to do")
+
# Base class for DIFs
#
@@ -605,10 +610,17 @@ class Experiment:
self.compute_enrollments()
self.compute_ipcps()
- # Realize the experiment, using a testbed-specific setup
- def swap_in(self):
- self.testbed.create_experiment(self)
-
@abc.abstractmethod
+ def run_prototype(self):
+ raise Exception('run_prototype() method not implemented')
+
def run(self):
- raise Exception('run() method not implemented')
+ # Realize the experiment testbed (testbed-specific)
+ self.testbed.swap_in(self)
+
+ # Run the experiment using the prototype (prototype-specific)
+ self.run_prototype()
+
+ # Undo the testbed (testbed-specific)
+ self.testbed.swap_out(self)
+
diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py
index 1babd57..37a6fbe 100644
--- a/rumba/prototypes/irati.py
+++ b/rumba/prototypes/irati.py
@@ -41,9 +41,8 @@ class Experiment(mod.Experiment):
ssh.execute_commands(self.testbed, node.full_name,
cmds, time_out=None)
- def run(self):
+ def run_prototype(self):
print("[IRATI experiment] start")
- self.swap_in()
print("Setting up IRATI on the nodes...")
self.setup()
print("[IRATI experiment] end")
diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py
index 57358cf..4f78361 100644
--- a/rumba/prototypes/ouroboros.py
+++ b/rumba/prototypes/ouroboros.py
@@ -49,10 +49,9 @@ class Experiment(mod.Experiment):
ssh.execute_commands(self.testbed, node.full_name,
cmds, time_out=None)
- def run(self):
+ def run_prototype(self):
print("[Ouroboros experiment] start")
print("Creating resources...")
- self.swap_in()
print("Setting up Ouroboros...")
self.setup_ouroboros()
print("Binding names...")
diff --git a/rumba/prototypes/rlite.py b/rumba/prototypes/rlite.py
index b175e92..f4ff825 100644
--- a/rumba/prototypes/rlite.py
+++ b/rumba/prototypes/rlite.py
@@ -20,6 +20,7 @@
import rumba.ssh_support as ssh
import rumba.model as mod
+import time
# An experiment over the RLITE implementation
class Experiment(mod.Experiment):
@@ -42,9 +43,9 @@ class Experiment(mod.Experiment):
ssh.execute_commands(self.testbed, node.full_name,
cmds, time_out=None)
- def run(self):
+ def run_prototype(self):
print("[RLITE experiment] start")
- self.swap_in()
print("Setting up rlite on the nodes...")
self.setup()
+ time.sleep(20)
print("[RLITE experiment] end")
diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py
index 134d344..c4dbe36 100644
--- a/rumba/testbeds/emulab.py
+++ b/rumba/testbeds/emulab.py
@@ -235,7 +235,7 @@ class Testbed(mod.Testbed):
ipcp.ifname = item[0]
node.full_name = self.full_name(node.name)
- def create_experiment(self, experiment):
+ def swap_in(self, experiment):
self._create_experiment(experiment)
self.swap_exp_in()
self.wait_until_nodes_up()
diff --git a/rumba/testbeds/faketestbed.py b/rumba/testbeds/faketestbed.py
index 4821aa8..85110b8 100644
--- a/rumba/testbeds/faketestbed.py
+++ b/rumba/testbeds/faketestbed.py
@@ -25,5 +25,5 @@ class Testbed(mod.Testbed):
def __init__(self, exp_name, username, proj_name="ARCFIRE", password=""):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
- def create_experiment(self, experiment):
+ def swap_in(self, experiment):
print("[Fake testbed] experiment swapped in")
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py
index e606846..f56e5ba 100644
--- a/rumba/testbeds/jfed.py
+++ b/rumba/testbeds/jfed.py
@@ -106,7 +106,7 @@ class Testbed(mod.Testbed):
file.write(doc.toprettyxml())
file.close()
- def create_experiment(self, experiment):
+ def swap_in(self, experiment):
self.create_rspec(experiment)
for node in experiment.nodes:
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 7dae921..1830b37 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -17,25 +17,25 @@
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301 USA
-from Queue import Empty
-from getpass import getpass, getuser
-from multiprocessing import Process, Queue, cpu_count
-from os import geteuid
-from time import sleep
+import getpass
+import multiprocessing
+import time
+import subprocess
+import os
import rumba.model as mod
-from subprocess import Popen, check_call, CalledProcessError
class Testbed(mod.Testbed):
- def __init__(self, exp_name, username, vm_img_folder, proj_name="ARCFIRE", password="",
- use_vhost=True, qemu_out_folder=""):
+ def __init__(self, exp_name, username, bzimage, initramfs, proj_name="ARCFIRE", password="",
+ use_vhost=True, qemu_logs_dir=None):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
self.vms = {}
self.shims = []
- self.vm_img_path = vm_img_folder
+ self.bzimage = bzimage
+ self.initramfs = initramfs
self.vhost = use_vhost
- self.qemu_folder = qemu_out_folder
+ self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir
self.boot_processes = []
@staticmethod
@@ -57,35 +57,35 @@ class Testbed(mod.Testbed):
for command in commands:
if not error_queue.empty():
break
- print('DEBUG: executing >> {}'.format(command))
- check_call(command.split())
+ print('DEBUG: executing >> %s' % command)
+ subprocess.check_call(command.split())
results_queue.put("Command chain ran correctly.")
- except CalledProcessError as e:
+ except subprocess.CalledProcessError as e:
error_queue.put(str(e))
- def create_experiment(self, experiment):
+ def swap_in(self, experiment):
"""
:type experiment mod.Experiment
:param experiment: The experiment running
"""
- if geteuid() != 0:
- pw = getpass('[sudo] password for {}:'.format(getuser()))
+ if os.geteuid() != 0:
+ pw = getpass.getpass('[sudo] password for %s:' % getpass.getuser())
if '"' in pw or "'" in pw:
print('Illegal password: contains " or \'')
raise Exception('Not authenticated')
else:
try:
- check_call("sudo -v -p '{}'".format(pw).split())
- except CalledProcessError:
+ subprocess.check_call("sudo -v -p '{}'".format(pw).split())
+ except subprocess.CalledProcessError:
raise Exception('Not authenticated')
print("[QEMU testbed] swapping in")
# Building bridges and taps
shim_processes = []
- r_queue = Queue()
- e_queue = Queue()
+ r_queue = multiprocessing.Queue()
+ e_queue = multiprocessing.Queue()
print(experiment.dif_ordering)
for shim in experiment.dif_ordering:
command_list = []
@@ -129,7 +129,7 @@ class Testbed(mod.Testbed):
if not e_queue.empty():
break
# Launch commands asynchronously
- process = Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
+ process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
shim_processes.append(process)
process.start()
@@ -142,20 +142,20 @@ class Testbed(mod.Testbed):
# Check for errors
if not e_queue.empty():
error_str = str(e_queue.get())
- print('Testbed instantiation failed: {}'.format(error_str))
- raise Exception('Failure: {}'.format(error_str))
+ print('Testbed instantiation failed: %s' % error_str)
+ raise Exception('Failure: %s' % error_str)
try:
# Check for results
result = r_queue.get(timeout=1)
if result == "Command chain ran correctly.":
over_processes += 1
print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
- except Empty:
+ except:
max_waiting_time -= 1
# Building vms
- boot_batch_size = max(1, cpu_count() // 2)
+ boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
booting_budget = boot_batch_size
boot_backoff = 12
base_port = 2222
@@ -174,7 +174,9 @@ class Testbed(mod.Testbed):
vm['id'] = vmid
vars_dict = {'fwdp': fwdp, 'id': vmid, 'mac': mac,
- 'vmimgpath': self.vm_img_path, 'fwdc': fwdc,
+ 'bzimage': self.bzimage,
+ 'initramfs': self.initramfs,
+ 'fwdc': fwdc,
'memory': vm_memory, 'frontend': vm_frontend,
'vmname': name}
@@ -183,9 +185,9 @@ class Testbed(mod.Testbed):
command = 'qemu-system-x86_64 '
# TODO manage non default images
- command += ('-kernel %(vmimgpath)s/bzImage '
+ command += ('-kernel %(bzimage)s '
'-append "console=ttyS0" '
- '-initrd %(vmimgpath)s/rootfs.cpio '
+ '-initrd %(initramfs)s '
% vars_dict)
command += ('-nographic '
'-display none '
@@ -216,18 +218,18 @@ class Testbed(mod.Testbed):
booting_budget -= 1
if booting_budget <= 0:
- print('Sleeping for {} seconds to give the machines time to boot up.'.format(boot_backoff))
- sleep(boot_backoff)
+ print('Sleeping for %s seconds to give the machines time to boot up.' % boot_backoff)
+ time.sleep(boot_backoff)
booting_budget = boot_batch_size
- with open(self.qemu_folder + '/qemu_out{}'.format(vmid), 'w') as out_file:
- print('DEBUG: executing >> {}'.format(command))
- self.boot_processes.append(Popen(command.split(), stdout=out_file))
+ with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file:
+ print('DEBUG: executing >> %s' % command)
+ self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file))
pass
vmid += 1
- def __del__(self):
+ def swap_out(self, experiment):
"""
:rtype str
:return: The script to tear down the experiment
@@ -241,8 +243,8 @@ class Testbed(mod.Testbed):
process.wait()
port_processes = []
- error_queue = Queue()
- results_queue = Queue()
+ error_queue = multiprocessing.Queue()
+ results_queue = multiprocessing.Queue()
for vm_name, vm in self.vms.items():
for port in vm['ports']:
tap = port['tap_id']
@@ -255,7 +257,7 @@ class Testbed(mod.Testbed):
'sudo ip tuntap del mode tap name %(tap)s'
% {'tap': tap, 'br': shim.name}
).split('\n')
- process = Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
+ process = multiprocessing.Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
port_processes.append(process)
process.start()
@@ -266,7 +268,7 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: {}'.format(str(error_queue.get())))
+ print('Failure while shutting down: %s' % str(error_queue.get()))
over_processes += 1
try:
# Check for results
@@ -274,11 +276,11 @@ class Testbed(mod.Testbed):
if result == "Command chain ran correctly.":
over_processes += 1
print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
- except Empty:
+ except:
max_waiting_time -= 1
- error_queue = Queue()
- results_queue = Queue()
+ error_queue = multiprocessing.Queue()
+ results_queue = multiprocessing.Queue()
shim_processes = []
for shim in self.shims:
@@ -287,7 +289,7 @@ class Testbed(mod.Testbed):
'sudo brctl delbr %(br)s'
% {'br': shim.name}
).split('\n')
- process = Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
+ process = multiprocessing.Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
shim_processes.append(process)
process.start()
@@ -298,7 +300,7 @@ class Testbed(mod.Testbed):
while max_waiting_time > 0 and over_processes < total_processes:
# Check for errors
if not error_queue.empty():
- print('Failure while shutting down: {}'.format(str(error_queue.get())))
+ print('Failure while shutting down: %s' % str(error_queue.get()))
over_processes += 1
try:
# Check for results
@@ -306,5 +308,5 @@ class Testbed(mod.Testbed):
if result == "Command chain ran correctly.":
over_processes += 1
print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
- except Empty:
+ except:
max_waiting_time -= 1