aboutsummaryrefslogtreecommitdiff
path: root/rumba/testbeds
diff options
context:
space:
mode:
authorNick Aerts <nick.aerts@ugent.be>2018-02-24 12:27:36 +0100
committerNick Aerts <nick.aerts@ugent.be>2018-03-20 11:30:07 +0100
commita7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2 (patch)
tree9c29f08693d577c81959c3194aec04e9bc1ba195 /rumba/testbeds
parent24bed306b5a67fc682b04cae017b0d0f3ac55a00 (diff)
downloadrumba-a7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2.tar.gz
rumba-a7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2.zip
testbeds: Add docker testbed
This adds support for a testbed based on Docker containers running on the local host. Bridging the containers can be done using built-in Linux bridging or using OpenVSwitch bridges. A new resource 'executor' has been added to abstract away command execution on nodes on the testbed. Executors have been created for local command execution, docker exec based command execution and SSH-based command execution. This has also been changed in the prototypes to execute using the correct executor.
Diffstat (limited to 'rumba/testbeds')
-rw-r--r--rumba/testbeds/dockertb.py190
-rw-r--r--rumba/testbeds/emulab.py10
-rw-r--r--rumba/testbeds/jfed.py5
-rw-r--r--rumba/testbeds/local.py (renamed from rumba/testbeds/faketestbed.py)7
-rw-r--r--rumba/testbeds/qemu.py16
5 files changed, 218 insertions, 10 deletions
diff --git a/rumba/testbeds/dockertb.py b/rumba/testbeds/dockertb.py
new file mode 100644
index 0000000..3895a7e
--- /dev/null
+++ b/rumba/testbeds/dockertb.py
@@ -0,0 +1,190 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Dimitri Staessens <dimitri.staessens@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
+import os
+from time import sleep
+import docker
+import subprocess
+
+import rumba.model as mod
+import rumba.log as log
+from rumba.executors.docker import DockerExecutor
+
+
+logger = log.get_logger(__name__)
+
+class Testbed(mod.Testbed):
+ def __init__(self, exp_name, base_image, pull_image=True,
+ use_ovs=False):
+ mod.Testbed.__init__(self, exp_name, "", "", "")
+
+ img = base_image.rsplit(":", 1)
+
+ self.base_image_repo = img[0]
+ self.base_image_tag = "latest" if len(img) == 1 else img[1]
+ self.base_image = "%s:%s" % (self.base_image_repo, self.base_image_tag)
+ self.pull_image = pull_image
+ self.use_ovs = use_ovs
+
+ self.running_containers = {}
+ self.active_bridges = set()
+ self.active_ipcps = set()
+
+ self.docker_client = docker.from_env()
+ self.executor = DockerExecutor(self)
+
+ def swap_in(self, experiment):
+ docker_client = self.docker_client
+
+ mod.Testbed.swap_in(self, experiment)
+
+ # Pull image
+ if self.pull_image:
+ docker_client.images.pull(self.base_image_repo,
+ self.base_image_tag)
+
+ docker_client.images.get("%s:%s" % (self.base_image_repo,
+ self.base_image_tag))
+
+ # Start all nodes
+ for node in experiment.nodes:
+ self.running_containers[node.name] = docker_client.containers.run(
+ self.base_image, command=node.startup_command, name=node.name,
+ detach=True, network="none", privileged=True,
+ devices=["/dev/fuse"])
+
+ logger.info("Nodes starting")
+
+ if not os.path.exists("/var/run/netns"):
+ subprocess.check_call('sudo mkdir /var/run/netns'.split())
+
+ for shim in experiment.dif_ordering:
+ if not isinstance(shim, mod.ShimEthDIF):
+ # Nothing to do here
+ continue
+
+ cmd = ""
+ if self.use_ovs:
+ cmd += 'sudo ovs-vsctl add-br %(shim)s'
+ else:
+ cmd += 'sudo ip link add %(shim)s type bridge'
+ cmd = cmd % {'shim': shim.name}
+
+ self.active_bridges.add(shim.name)
+
+ subprocess.check_call(cmd.split())
+
+ if not self.use_ovs:
+ cmd = 'sudo ip link set dev %(shim)s up' % {'shim': shim.name}
+ subprocess.check_call(cmd.split())
+ for node in experiment.nodes:
+ container = self.running_containers[node.name]
+
+ container.reload()
+
+ state = container.attrs["State"]
+
+ while not state["Running"]:
+ sleep(0.2)
+ container.reload()
+ state = container.attrs["State"]
+
+ pid = state["Pid"]
+
+ subprocess.check_call(('sudo ln -s /proc/%(pid)i/ns/net '
+ '/var/run/netns/%(pid)i' % {'pid': pid}).split())
+
+ for ipcp in node.ipcps:
+ if isinstance(ipcp, mod.ShimEthIPCP):
+ if ipcp.ifname is None:
+ ipcp.ifname = "eth%i" % node.ipcps.index(ipcp)
+
+ cmd = ('sudo ip link add %(node)s.%(ifname)s type veth '
+ 'peer name _%(node)s.%(ifname)s'\
+ % {'node': node.name, 'ifname': ipcp.ifname})
+ subprocess.check_call(cmd.split())
+
+ cmd = ""
+ if self.use_ovs:
+ cmd += ('sudo ovs-vsctl add-port %(dif)s %(node)s.%('
+ 'ifname)s')
+ else:
+ cmd += ('sudo ip link set %(node)s.%(ifname)s master '
+ '%(dif)s')
+
+ cmd = (cmd % {'node': node.name,
+ 'ifname': ipcp.ifname,
+ 'dif': ipcp.dif.name})
+
+ subprocess.check_call(cmd.split())
+
+ cmd = ('sudo ip link set _%(node)s.%(ifname)s '
+ 'netns %(pid)i '
+ 'name %(ifname)s'
+ % {'node': node.name,
+ 'pid': pid,
+ 'ifname': ipcp.ifname})
+ subprocess.check_call(cmd.split())
+
+ cmd = ('sudo ip link set dev %(node)s.%(ifname)s up'
+ % {'node': node.name, 'ifname': ipcp.ifname})
+ subprocess.check_call(cmd.split())
+
+ cmd = ('sudo ip netns exec %(pid)i ip link set dev '
+ '%(ifname)s up'
+ % {'pid': pid, 'ifname': ipcp.ifname})
+ subprocess.check_call(cmd.split())
+
+ self.active_ipcps.add(ipcp)
+
+ logger.info("Experiment swapped in")
+
+ def swap_out(self, experiment):
+ for name, container in self.running_containers.items():
+ container.remove(force=True)
+
+ for shim in experiment.dif_ordering:
+ if isinstance(shim, mod.ShimEthDIF) and shim.name in self.active_bridges:
+ cmd = ""
+ if self.use_ovs:
+ cmd += 'sudo ovs-vsctl del-br %(shim)s'
+ else:
+ cmd += 'sudo ip link del %(shim)s'
+ cmd = cmd % {'shim': shim.name}
+
+ subprocess.check_call(cmd.split())
+
+ self.active_bridges.remove(shim.name)
+
+ for name, container in self.running_containers.items():
+ pid = container.attrs["State"]["Pid"]
+
+ cmd = 'sudo rm /var/run/netns/%(pid)i' % {'pid': pid}
+ subprocess.check_call(cmd.split())
+
+ self.running_containers = {}
+
+ logger.info("Experiment swapped out") \ No newline at end of file
diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py
index 4f8b023..06e0e5c 100644
--- a/rumba/testbeds/emulab.py
+++ b/rumba/testbeds/emulab.py
@@ -34,6 +34,8 @@ import rumba.ssh_support as ssh
import rumba.model as mod
import rumba.log as log
+from rumba.executors.ssh import SSHExecutor
+
logger = log.get_logger(__name__)
@@ -56,6 +58,8 @@ class Testbed(mod.Testbed):
self.ip = dict()
self.ops_ssh_config = mod.SSHConfig(self.ops_server())
+ self.executor = SSHExecutor
+
if "wall" in url:
self.http_proxy="https://proxy.atlantis.ugent.be:8080"
@@ -237,7 +241,7 @@ class Testbed(mod.Testbed):
node.ssh_config.set_password(self.password)
cmd = 'cat /var/emulab/boot/topomap'
- topomap = ssh.execute_command(self, experiment.nodes[0].ssh_config, cmd)
+ topomap = node.execute_command(cmd)
# Almost as ugly as yo momma
index = topomap.rfind("# lans")
topo_array = topomap[:index].split('\n')[1:-1]
@@ -258,7 +262,7 @@ class Testbed(mod.Testbed):
for node in experiment.nodes:
cmd = 'cat /var/emulab/boot/ifmap'
- output = ssh.execute_command(self, node.ssh_config, cmd)
+ output = node.execute_command(cmd)
output = re.split('\n', output)
for item in output:
item = item.split()
@@ -268,6 +272,8 @@ class Testbed(mod.Testbed):
ipcp.ifname = item[0]
def swap_in(self, experiment):
+ mod.Testbed.swap_in(self, experiment)
+
self._create_experiment(experiment)
wait = self.swap_exp_in()
if wait:
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py
index 3cfc82b..db9dd14 100644
--- a/rumba/testbeds/jfed.py
+++ b/rumba/testbeds/jfed.py
@@ -32,6 +32,8 @@ import time
import tarfile
import sys
+from rumba.executors.ssh import SSHExecutor
+
if sys.version_info[0] >= 3:
from urllib.request import urlretrieve
else:
@@ -65,6 +67,7 @@ class Testbed(mod.Testbed):
self.manifest = os.path.join(mod.tmp_dir, self.exp_name + ".rrspec")
self.jfed_jar = os.path.join(mod.cache_dir,
'jfed_cli/experimenter-cli.jar')
+ self.executor = SSHExecutor(self)
if "exogeni" in authority:
self.authority = "urn:publicid:IDN+" + authority + "+authority+am"
@@ -204,6 +207,8 @@ class Testbed(mod.Testbed):
raise
def swap_in(self, experiment):
+ mod.Testbed.swap_in(self, experiment)
+
for node in experiment.nodes:
node.ssh_config.set_http_proxy(self.http_proxy)
self.create_rspec(experiment)
diff --git a/rumba/testbeds/faketestbed.py b/rumba/testbeds/local.py
index b021aca..77aed82 100644
--- a/rumba/testbeds/faketestbed.py
+++ b/rumba/testbeds/local.py
@@ -27,17 +27,22 @@
import rumba.model as mod
import rumba.log as log
+from rumba.executors.local import LocalExecutor
logger = log.get_logger(__name__)
-# Fake testbed, useful for testing
+# Local testbed, useful for testing
class Testbed(mod.Testbed):
def __init__(self, exp_name, username, proj_name="ARCFIRE", password=""):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
+ self.executor = LocalExecutor(self)
+
def swap_in(self, experiment):
+ mod.Testbed.swap_in(self, experiment)
+
logger.info("Experiment swapped in")
def swap_out(self, experiment):
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 4321fc8..f3daefd 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -32,9 +32,10 @@ from subprocess import CalledProcessError
import rumba.model as mod
import rumba.log as log
-import rumba.ssh_support as ssh_support
import rumba.multiprocess as m_processing
+from rumba.executors.ssh import SSHExecutor
+
if sys.version_info[0] >= 3:
from urllib.request import urlretrieve
else:
@@ -59,6 +60,8 @@ class Testbed(mod.Testbed):
self.initramfs_path = initramfs_path
self.multiproc_manager = None
+ self.executor = SSHExecutor(self);
+
# Prepend sudo to all commands if the user is not 'root'
def may_sudo(self, cmds):
if os.geteuid() != 0:
@@ -116,13 +119,9 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id)
logger.info('Recovering ifname for port: %s.',
port['tap_id'])
- output = ssh_support.execute_command(
- self,
- node.ssh_config,
- 'mac2ifname ' + mac)
+ output = node.execute_command('mac2ifname ' + mac)
ipcp.ifname = output
- ssh_support.execute_command(
- self, node.ssh_config,
+ node.execute_command(
"ip link set dev %(ifname)s up"
% {'ifname': ipcp.ifname})
@@ -131,6 +130,9 @@ class Testbed(mod.Testbed):
:type experiment mod.Experiment
:param experiment: The experiment running
"""
+
+ mod.Testbed.swap_in(self, experiment)
+
if os.geteuid() != 0:
try:
subprocess.check_call(["sudo", "-v"])