aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Aerts <nick.aerts@ugent.be>2018-02-24 12:27:36 +0100
committerNick Aerts <nick.aerts@ugent.be>2018-03-20 11:30:07 +0100
commita7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2 (patch)
tree9c29f08693d577c81959c3194aec04e9bc1ba195
parent24bed306b5a67fc682b04cae017b0d0f3ac55a00 (diff)
downloadrumba-a7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2.tar.gz
rumba-a7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2.zip
testbeds: Add docker testbed
This adds support for a testbed based on Docker containers running on the local host. Bridging the containers can be done using built-in Linux bridging or using OpenVSwitch bridges. A new resource 'executor' has been added to abstract away command execution on nodes on the testbed. Executors have been created for local command execution, docker exec based command execution and SSH-based command execution. This has also been changed in the prototypes to execute using the correct executor.
-rw-r--r--AUTHORS.txt1
-rw-r--r--README.md14
-rwxr-xr-xexamples/converged-operator-network.py2
-rwxr-xr-xexamples/docker-ouroboros.py65
-rwxr-xr-xexamples/example.py2
-rwxr-xr-xexamples/mouse.py2
-rwxr-xr-xexamples/snake.py2
-rwxr-xr-xexamples/two-layers.py6
-rwxr-xr-xexamples/vpn.py2
-rw-r--r--rumba/executors/__init__.py26
-rw-r--r--rumba/executors/docker.py80
-rw-r--r--rumba/executors/local.py55
-rw-r--r--rumba/executors/ssh.py64
-rw-r--r--rumba/model.py130
-rw-r--r--rumba/prototypes/ouroboros.py49
-rw-r--r--rumba/ssh_support.py4
-rw-r--r--rumba/testbeds/dockertb.py190
-rw-r--r--rumba/testbeds/emulab.py10
-rw-r--r--rumba/testbeds/jfed.py5
-rw-r--r--rumba/testbeds/local.py (renamed from rumba/testbeds/faketestbed.py)7
-rw-r--r--rumba/testbeds/qemu.py16
-rwxr-xr-xsetup.py3
-rwxr-xr-xtools/democonf2rumba.py6
23 files changed, 612 insertions, 129 deletions
diff --git a/AUTHORS.txt b/AUTHORS.txt
index 55a1c91..0e929fb 100644
--- a/AUTHORS.txt
+++ b/AUTHORS.txt
@@ -2,3 +2,4 @@ Sander Vrijders <sander.vrijders@intec.ugent.be>
Dimitri Staessens <dimitri.staessens@ugent.be>
Vincenzo Maffione <v.maffione@nextworks.it>
Marco Capitani <m.capitani@nextworks.it>
+Nick Aerts <nick.aerts@ugent.be> \ No newline at end of file
diff --git a/README.md b/README.md
index 5f42618..6feafbc 100644
--- a/README.md
+++ b/README.md
@@ -160,6 +160,20 @@ Example scripts can be found in the examples/ folder.
for the previous commands, without changing the user (e.g. using su
or sudo).
+ * [Docker](https://www.docker.com/) is a container runtime environment.
+
+ To use the Docker testbed the Docker software needs to be installed, see
+ [Install Docker](https://docs.docker.com/install/) and complete
+ [Post-installation steps for Linux](https://docs.docker
+ .com/install/linux/linux-postinstall/)
+
+ To use a Docker testbed you can for example use:
+
+ tb = docker.Testbed(exp_name = "ouroboros",
+ base_image = "ouroborosrumba/prototype")
+
+ This will pull the latest ouroboros image from Docker Hub.
+
## Accessing nodes after swap-in
To access a node once the experiment swapped in, use the following
diff --git a/examples/converged-operator-network.py b/examples/converged-operator-network.py
index 3b80746..8ed79ed 100755
--- a/examples/converged-operator-network.py
+++ b/examples/converged-operator-network.py
@@ -10,7 +10,7 @@ from rumba.utils import *
# import testbed plugins
import rumba.testbeds.emulab as emulab
import rumba.testbeds.jfed as jfed
-import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.local as local
import rumba.testbeds.qemu as qemu
# import prototype plugins
diff --git a/examples/docker-ouroboros.py b/examples/docker-ouroboros.py
new file mode 100755
index 0000000..1c6009c
--- /dev/null
+++ b/examples/docker-ouroboros.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python
+
+# An example script using the rumba package
+
+from rumba.model import *
+from rumba.utils import ExperimentManager
+
+# import testbed plugins
+import rumba.testbeds.dockertb as docker
+
+# import prototype plugins
+import rumba.prototypes.ouroboros as our
+
+import argparse
+import sys
+
+
+description = "Script to create an ouroboros"
+
+argparser = argparse.ArgumentParser(description = description)
+argparser.add_argument('-n', '--nodes', type = int, default = '10',
+ help = "Total number of nodes")
+
+args = argparser.parse_args()
+
+log.set_logging_level('DEBUG')
+
+n01 = NormalDIF("n01")
+
+if (args.nodes < 3):
+ print("The ouroboros must be longer than 2 nodes")
+ sys.exit(-1)
+
+nodes = []
+
+shim_prev = None
+for i in range(0, args.nodes):
+ shim = ShimEthDIF("e" + str(i))
+
+ if shim_prev == None and shim != None:
+ node = Node("node" + str(i), difs = [n01, shim],
+ dif_registrations = {n01 : [shim]})
+ elif shim_prev != None and shim != None:
+ node = Node("node" + str(i), difs = [n01, shim, shim_prev],
+ dif_registrations = {n01 : [shim, shim_prev]})
+ else:
+ node = Node("node" + str(i), difs = [n01, shim_prev],
+ dif_registrations = {n01 : [shim_prev]})
+
+ shim_prev = shim
+ nodes.append(node)
+
+nodes[0].add_dif(shim_prev)
+nodes[0].add_dif_registration(n01, shim_prev)
+
+tb = docker.Testbed(exp_name = "ouroboros",
+ base_image = "ouroborosrumba/prototype")
+
+exp = our.Experiment(tb, nodes = nodes)
+
+print(exp)
+
+with ExperimentManager(exp):
+ exp.swap_in()
+ exp.bootstrap_prototype()
diff --git a/examples/example.py b/examples/example.py
index eee966b..62d5a53 100755
--- a/examples/example.py
+++ b/examples/example.py
@@ -9,7 +9,7 @@ from rumba.storyboard import *
# import testbed plugins
import rumba.testbeds.emulab as emulab
import rumba.testbeds.jfed as jfed
-import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.local as local
import rumba.testbeds.qemu as qemu
# import prototype plugins
diff --git a/examples/mouse.py b/examples/mouse.py
index 026b3bc..16fdf49 100755
--- a/examples/mouse.py
+++ b/examples/mouse.py
@@ -8,7 +8,7 @@ from rumba.utils import ExperimentManager
# import testbed plugins
import rumba.testbeds.emulab as emulab
import rumba.testbeds.jfed as jfed
-import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.local as local
import rumba.testbeds.qemu as qemu
# import prototype plugins
diff --git a/examples/snake.py b/examples/snake.py
index 6f9c48d..3d76797 100755
--- a/examples/snake.py
+++ b/examples/snake.py
@@ -8,7 +8,7 @@ from rumba.utils import ExperimentManager
# import testbed plugins
import rumba.testbeds.emulab as emulab
import rumba.testbeds.jfed as jfed
-import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.local as local
import rumba.testbeds.qemu as qemu
# import prototype plugins
diff --git a/examples/two-layers.py b/examples/two-layers.py
index c375088..3f50037 100755
--- a/examples/two-layers.py
+++ b/examples/two-layers.py
@@ -8,7 +8,7 @@ from rumba.storyboard import *
# import testbed plugins
import rumba.testbeds.emulab as emulab
import rumba.testbeds.jfed as jfed
-import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.local as local
import rumba.testbeds.qemu as qemu
# import prototype plugins
@@ -59,6 +59,6 @@ with ExperimentManager(exp):
exp.swap_in()
exp.bootstrap_prototype()
sb = StoryBoard(experiment=exp, duration=15, servers=[])
- sb.run_command(7.5, a, 'echo "7.5 secs in. We are at $(hostname)"')
- sb.run_command(12, b, 'echo "12 secs in. We are at $(hostname)"')
+ sb.schedule_command(7.5, a, 'echo "7.5 secs in. We are at $(hostname)"')
+ sb.schedule_command(12, b, 'echo "12 secs in. We are at $(hostname)"')
sb.start()
diff --git a/examples/vpn.py b/examples/vpn.py
index 28de09e..6aa8db6 100755
--- a/examples/vpn.py
+++ b/examples/vpn.py
@@ -8,7 +8,7 @@ from rumba.utils import ExperimentManager
# import testbed plugins
import rumba.testbeds.emulab as emulab
import rumba.testbeds.jfed as jfed
-import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.local as local
import rumba.testbeds.qemu as qemu
# import prototype plugins
diff --git a/rumba/executors/__init__.py b/rumba/executors/__init__.py
new file mode 100644
index 0000000..58fcfb5
--- /dev/null
+++ b/rumba/executors/__init__.py
@@ -0,0 +1,26 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Dimitri Staessens <dimitri.staessens@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
diff --git a/rumba/executors/docker.py b/rumba/executors/docker.py
new file mode 100644
index 0000000..e1e1480
--- /dev/null
+++ b/rumba/executors/docker.py
@@ -0,0 +1,80 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Dimitri Staessens <dimitri.staessens@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
+
+from rumba import model as mod
+
+import tempfile
+import tarfile
+
+from rumba import log
+
+logger = log.get_logger(__name__)
+
+
+class DockerException(Exception):
+ pass
+
+
+class DockerExecutor(mod.Executor):
+ def __init__(self, testbed):
+ self.testbed = testbed
+ self.running_containers = testbed.running_containers
+
+ def execute_command(self, node, command, sudo=False, time_out=3):
+ logger.debug("%s >> %s" % (node.name, command))
+
+ c, o = self.running_containers[node.name].exec_run(["sh", "-c",
+ command])
+ if c:
+ raise DockerException('A remote command returned an error. '
+ 'Output:\n\n\t' + o.decode("utf-8"))
+
+ return o.decode("utf-8")
+
+ def fetch_file(self, node, path, destination, as_root=False):
+ if not path.startswith("/"):
+ workingdir = self.running_containers[node.name].attrs["Config"][
+ "WorkingDir"]
+ path = os.path.join(workingdir, path)
+
+ try:
+ with tempfile.NamedTemporaryFile() as tmp:
+ archive, _ = self.running_containers[node.name].get_archive(
+ path)
+
+ for c in archive:
+ tmp.write(c)
+
+ tmp.seek(0)
+
+ tarfile.TarFile(fileobj=tmp, mode='r').extract(
+ path.basename(path), destination)
+ except:
+ logger.error("Error when extracting %s" % path)
+
+ def copy_file(self, node, path, destination):
+ pass
diff --git a/rumba/executors/local.py b/rumba/executors/local.py
new file mode 100644
index 0000000..9f3f484
--- /dev/null
+++ b/rumba/executors/local.py
@@ -0,0 +1,55 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Dimitri Staessens <dimitri.staessens@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
+
+from rumba import model as mod
+
+import subprocess
+
+from rumba import log
+
+logger = log.get_logger(__name__)
+
+
+class LocalExecutor(mod.Executor):
+ def __init__(self, testbed):
+ self.testbed = testbed
+
+ def execute_command(self, node, cmd, sudo=False, time_out=3):
+ try:
+ logger.debug("%s >> %s" % (node.name, cmd))
+ subprocess.check_call(cmd.split(' '))
+ except subprocess.CalledProcessError as e:
+ logger.error("Return code was " + str(e.returncode))
+ raise
+
+ def fetch_file(self, node, path, destination, as_root=False):
+ logger.error("Fetch_file not supported for local testbed")
+ raise
+
+ def copy_file(self, node, path, destination):
+ logger.error("Copy_file not supported for local testbed")
+ raise
diff --git a/rumba/executors/ssh.py b/rumba/executors/ssh.py
new file mode 100644
index 0000000..6a3a41d
--- /dev/null
+++ b/rumba/executors/ssh.py
@@ -0,0 +1,64 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Dimitri Staessens <dimitri.staessens@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
+
+from rumba.model import Executor
+from rumba.ssh_support import execute_command, execute_commands, \
+ copy_file_to_testbed, copy_file_from_testbed, execute_proxy_command, \
+ execute_proxy_commands
+
+
+class SSHExecutor(Executor):
+ def __init__(self, testbed, use_proxy=False):
+ self.testbed = testbed
+ self.use_proxy = use_proxy
+
+ def execute_command(self, node, command, as_root=False, time_out=3):
+ if as_root:
+ if node.ssh_config.username != 'root':
+ command = "sudo %s" % command
+
+ if self.use_proxy:
+ return execute_proxy_command(self, node.ssh_config, command,
+ time_out)
+ else:
+ return execute_command(self.testbed, node.ssh_config, command,
+ time_out)
+
+ def execute_commands(self, node, commands, as_root=False, time_out=3):
+ if self.use_proxy:
+ return execute_proxy_commands(self, node.ssh_config, commands,
+ time_out)
+ else:
+ return execute_commands(self.testbed, node.ssh_config, commands,
+ time_out)
+
+ def fetch_file(self, node, path, destination, sudo):
+ copy_file_from_testbed(self.testbed, node.ssh_config, path,
+ destination, sudo)
+
+ def copy_file(self, node, path, destination):
+ copy_file_to_testbed(self.testbed, node.ssh_config, path, destination) \ No newline at end of file
diff --git a/rumba/model.py b/rumba/model.py
index 39528be..20f8215 100644
--- a/rumba/model.py
+++ b/rumba/model.py
@@ -31,7 +31,6 @@ import time
import shutil
import rumba.log as log
-from rumba import ssh_support
logger = log.get_logger(__name__)
@@ -88,7 +87,8 @@ class Testbed(object):
@abc.abstractmethod
def swap_in(self, experiment):
- raise Exception('swap_in() not implemented')
+ for node in experiment.nodes:
+ node.executor = self.executor
@abc.abstractmethod
def swap_out(self, experiment):
@@ -245,6 +245,9 @@ class Node(object):
if hasattr(dif, 'policy'): # check if the dif supports policies
self.policies[dif] = policies.get(dif, Policy(dif, self))
+ self.executor = None # will be set by testbed on swap_in
+ self.startup_command = None # will be set by prototype
+
self._validate()
def get_ipcp_by_dif(self, dif):
@@ -328,88 +331,34 @@ class Node(object):
def get_policy(self, dif):
return self.policies[dif]
- def execute_commands(self, commands, time_out=3, use_proxy=False):
- # Ssh_config is used twice since it doubles as testbed info
- # (it holds fields username and password)
- if use_proxy:
- return ssh_support.execute_proxy_commands(
- self.ssh_config,
- self.ssh_config,
- commands,
- time_out
- )
- # else:
- return ssh_support.execute_commands(
- self.ssh_config,
- self.ssh_config,
- commands,
- time_out
- )
-
- def execute_command(self, command, time_out=3,
- use_proxy=False, as_root=False):
- # Ssh_config is used twice since it doubles as testbed info
- # (it holds fields username and password)
- if as_root:
- if self.ssh_config.username != 'root':
- command = "sudo %s" % command
-
- if use_proxy:
- return ssh_support.execute_proxy_command(
- self.ssh_config,
- self.ssh_config,
- command,
- time_out
- )
- # else:
- return ssh_support.execute_command(
- self.ssh_config,
- self.ssh_config,
- command,
- time_out
- )
+ def execute_commands(self, commands, as_root=False, time_out=3,
+ use_proxy=False):
+ return self.executor.execute_commands(self, commands, as_root, time_out)
+
+ def execute_command(self, command, as_root=False, time_out=3,
+ use_proxy=False):
+ return self.executor.execute_command(self, command, as_root, time_out)
def write_text_to_file(self, text, file_name):
- ssh_support.write_text_to_file(
- self.ssh_config,
- self.ssh_config,
- text,
- file_name
- )
+ # ssh_support.write_text_to_file(
+ # self.ssh_config,
+ # self.ssh_config,
+ # text,
+ # file_name
+ # )
+ return
def copy_file(self, path, destination):
- ssh_support.copy_file_to_testbed(
- self.ssh_config,
- self.ssh_config,
- path,
- destination
- )
+ self.executor.copy_file(self, path, destination)
def copy_files(self, paths, destination):
- ssh_support.copy_files_to_testbed(
- self.ssh_config,
- self.ssh_config,
- paths,
- destination
- )
+ self.executor.copy_files(self, paths, destination)
def fetch_files(self, paths, destination, sudo=False):
- ssh_support.copy_files_from_testbed(
- self.ssh_config,
- self.ssh_config,
- paths,
- destination,
- sudo=sudo
- )
+ self.executor.fetch_files(self, paths, destination, sudo)
def fetch_file(self, path, destination, sudo=False):
- ssh_support.copy_file_from_testbed(
- self.ssh_config,
- self.ssh_config,
- path,
- destination,
- sudo=sudo
- )
+ self.executor.fetch_files(self, path, destination, sudo)
def set_link_state(self, ipcp, state):
self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state,
@@ -869,6 +818,10 @@ class Experiment(object):
end = time.time()
logger.info("Install took %.2f seconds", end - start)
+ def set_startup_command(self, command):
+ for node in self.nodes:
+ node.startup_command = command
+
def bootstrap_prototype(self):
start = time.time()
self._bootstrap_prototype()
@@ -912,3 +865,32 @@ class Experiment(object):
self.testbed.swap_out(self)
end = time.time()
logger.info("Swap-out took %.2f seconds", end - start)
+
+
+class Executor:
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def execute_command(self, node, command, as_root=False, time_out=3):
+ # Execute command on a node
+ return
+
+ def execute_commands(self, node, commands, as_root=False, time_out=3):
+ for command in commands:
+ self.execute_command(node, command, as_root, time_out)
+
+ @abc.abstractmethod
+ def copy_file(self, node, path, destination):
+ return
+
+ def copy_files(self, node, paths, destination):
+ for path in paths:
+ self.copy_file(node, path, destination)
+
+ @abc.abstractmethod
+ def fetch_file(self, node, path, destination, sudo=False):
+ return
+
+ def fetch_files(self, node, paths, destination, sudo=False):
+ for path in paths:
+ self.fetch_file(node, path, destination, sudo) \ No newline at end of file
diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py
index 33ecd0b..97f6e1a 100644
--- a/rumba/prototypes/ouroboros.py
+++ b/rumba/prototypes/ouroboros.py
@@ -31,7 +31,8 @@ import rumba.ssh_support as ssh
import rumba.model as mod
import rumba.multiprocess as m_processing
import rumba.log as log
-import rumba.testbeds.faketestbed as fake
+import rumba.testbeds.local as local
+import rumba.testbeds.dockertb as docker
logger = log.get_logger(__name__)
@@ -45,6 +46,8 @@ class Experiment(mod.Experiment):
mod.Experiment.__init__(self, testbed, nodes, git_repo, git_branch)
self.r_ipcps = dict()
+ self.set_startup_command("irmd")
+
@staticmethod
def make_executor(node, packages, testbed):
def executor(commands):
@@ -68,19 +71,20 @@ class Experiment(mod.Experiment):
self.exec_local_cmd(cmd)
def setup_ouroboros(self):
- if isinstance(self.testbed, fake.Testbed):
+ if isinstance(self.testbed, docker.Testbed):
+ return
+
+ if isinstance(self.testbed, local.Testbed):
subprocess.check_call('sudo -v'.split())
self.irmd = subprocess.Popen(["sudo", "irmd"])
logger.info("Started IRMd, sleeping 2 seconds...")
time.sleep(2)
else:
for node in self.nodes:
- ssh.execute_command(self.testbed, node.ssh_config,
- "sudo nohup irmd > /dev/null &",
- time_out=None)
+ node.execute_command("sudo nohup irmd > /dev/null &", time_out=None)
def install_ouroboros(self):
- if isinstance(self.testbed, fake.Testbed):
+ if isinstance(self.testbed, local.Testbed):
return
packages = ["cmake", "protobuf-c-compiler", "git", "libfuse-dev",
@@ -120,7 +124,7 @@ class Experiment(mod.Experiment):
cmd = "irm i c n " + ipcp.name
if isinstance(ipcp.dif, mod.ShimEthDIF):
- if isinstance(self.testbed, fake.Testbed):
+ if isinstance(self.testbed, local.Testbed):
cmd += " type local layer " + ipcp.dif.name
else:
cmd += " type eth-llc dev " + ipcp.ifname
@@ -154,11 +158,7 @@ class Experiment(mod.Experiment):
# Postpone registrations
self.r_ipcps[ipcp] = cmds2
- if isinstance(self.testbed, fake.Testbed):
- self.exec_local_cmds(cmds)
- else:
- ssh.execute_commands(self.testbed, node.ssh_config, cmds,
- time_out=None)
+ node.execute_commands(cmds, time_out=None)
def enroll_dif(self, el):
for e in el:
@@ -167,13 +167,8 @@ class Experiment(mod.Experiment):
# Execute postponed registration
if e['enroller'] in self.r_ipcps:
- if isinstance(self.testbed, fake.Testbed):
- self.exec_local_cmds(self.r_ipcps[e['enroller']])
- else:
- ssh.execute_commands(self.testbed,
- e['enroller'].node.ssh_config,
- self.r_ipcps[e['enroller']],
- time_out=None)
+ e['enroller'].node.execute_commands(self.r_ipcps[e['enroller']],
+ time_out=None)
self.r_ipcps.pop(e['enroller'], None)
cmd = "irm r n " + ipcp.name
@@ -188,12 +183,7 @@ class Experiment(mod.Experiment):
cmd += " layer " + dif_b.name
cmds.append(cmd)
- if isinstance(self.testbed, fake.Testbed):
- self.exec_local_cmds(cmds)
- else:
- ssh.execute_commands(self.testbed,
- e['enrollee'].node.ssh_config,
- cmds, time_out=None)
+ e['enrollee'].node.execute_commands(cmds, time_out=None)
def setup_flows(self, el, comp):
for e in el:
@@ -201,12 +191,7 @@ class Experiment(mod.Experiment):
cmd = "irm i conn n " + ipcp.name + " comp " + \
comp + " dst " + e['dst'].name
- if isinstance(self.testbed, fake.Testbed):
- self.exec_local_cmd(cmd)
- else:
- ssh.execute_command(self.testbed,
- ipcp.node.ssh_config,
- cmd, time_out=None)
+ ipcp.node.execute_command(cmd, time_out=None)
def _install_prototype(self):
logger.info("Installing Ouroboros...")
@@ -230,6 +215,6 @@ class Experiment(mod.Experiment):
logger.info("All done, have fun!")
def _terminate_prototype(self):
- if isinstance(self.testbed, fake.Testbed):
+ if isinstance(self.testbed, local.Testbed):
logger.info("Killing IRMd...")
subprocess.check_call('sudo killall -15 irmd'.split())
diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py
index 88334ea..b1492e7 100644
--- a/rumba/ssh_support.py
+++ b/rumba/ssh_support.py
@@ -32,6 +32,8 @@ import time
import rumba.log as log
# Fix input reordering
+from rumba.model import Executor
+
try:
import builtins # Only in Python 3
@@ -434,4 +436,4 @@ def aptitude_install(testbed, node, packages):
"while ! " + sudo("apt-get update") + "; do sleep 1; done",
"while ! " + sudo(package_install) + "; do sleep 1; done"]
- execute_proxy_commands(testbed, node.ssh_config, cmds, time_out=None)
+ execute_proxy_commands(testbed, node.ssh_config, cmds, time_out=None) \ No newline at end of file
diff --git a/rumba/testbeds/dockertb.py b/rumba/testbeds/dockertb.py
new file mode 100644
index 0000000..3895a7e
--- /dev/null
+++ b/rumba/testbeds/dockertb.py
@@ -0,0 +1,190 @@
+#
+# A library to manage ARCFIRE experiments
+#
+# Copyright (C) 2017 Nextworks S.r.l.
+# Copyright (C) 2017 imec
+#
+# Sander Vrijders <sander.vrijders@ugent.be>
+# Dimitri Staessens <dimitri.staessens@ugent.be>
+# Vincenzo Maffione <v.maffione@nextworks.it>
+# Marco Capitani <m.capitani@nextworks.it>
+# Nick Aerts <nick.aerts@ugent.be>
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., http://www.fsf.org/about/contact/.
+#
+import os
+from time import sleep
+import docker
+import subprocess
+
+import rumba.model as mod
+import rumba.log as log
+from rumba.executors.docker import DockerExecutor
+
+
+logger = log.get_logger(__name__)
+
+class Testbed(mod.Testbed):
+ def __init__(self, exp_name, base_image, pull_image=True,
+ use_ovs=False):
+ mod.Testbed.__init__(self, exp_name, "", "", "")
+
+ img = base_image.rsplit(":", 1)
+
+ self.base_image_repo = img[0]
+ self.base_image_tag = "latest" if len(img) == 1 else img[1]
+ self.base_image = "%s:%s" % (self.base_image_repo, self.base_image_tag)
+ self.pull_image = pull_image
+ self.use_ovs = use_ovs
+
+ self.running_containers = {}
+ self.active_bridges = set()
+ self.active_ipcps = set()
+
+ self.docker_client = docker.from_env()
+ self.executor = DockerExecutor(self)
+
+ def swap_in(self, experiment):
+ docker_client = self.docker_client
+
+ mod.Testbed.swap_in(self, experiment)
+
+ # Pull image
+ if self.pull_image:
+ docker_client.images.pull(self.base_image_repo,
+ self.base_image_tag)
+
+ docker_client.images.get("%s:%s" % (self.base_image_repo,
+ self.base_image_tag))
+
+ # Start all nodes
+ for node in experiment.nodes:
+ self.running_containers[node.name] = docker_client.containers.run(
+ self.base_image, command=node.startup_command, name=node.name,
+ detach=True, network="none", privileged=True,
+ devices=["/dev/fuse"])
+
+ logger.info("Nodes starting")
+
+ if not os.path.exists("/var/run/netns"):
+ subprocess.check_call('sudo mkdir /var/run/netns'.split())
+
+ for shim in experiment.dif_ordering:
+ if not isinstance(shim, mod.ShimEthDIF):
+ # Nothing to do here
+ continue
+
+ cmd = ""
+ if self.use_ovs:
+ cmd += 'sudo ovs-vsctl add-br %(shim)s'
+ else:
+ cmd += 'sudo ip link add %(shim)s type bridge'
+ cmd = cmd % {'shim': shim.name}
+
+ self.active_bridges.add(shim.name)
+
+ subprocess.check_call(cmd.split())
+
+ if not self.use_ovs:
+ cmd = 'sudo ip link set dev %(shim)s up' % {'shim': shim.name}
+ subprocess.check_call(cmd.split())
+ for node in experiment.nodes:
+ container = self.running_containers[node.name]
+
+ container.reload()
+
+ state = container.attrs["State"]
+
+ while not state["Running"]:
+ sleep(0.2)
+ container.reload()
+ state = container.attrs["State"]
+
+ pid = state["Pid"]
+
+ subprocess.check_call(('sudo ln -s /proc/%(pid)i/ns/net '
+ '/var/run/netns/%(pid)i' % {'pid': pid}).split())
+
+ for ipcp in node.ipcps:
+ if isinstance(ipcp, mod.ShimEthIPCP):
+ if ipcp.ifname is None:
+ ipcp.ifname = "eth%i" % node.ipcps.index(ipcp)
+
+ cmd = ('sudo ip link add %(node)s.%(ifname)s type veth '
+ 'peer name _%(node)s.%(ifname)s'\
+ % {'node': node.name, 'ifname': ipcp.ifname})
+ subprocess.check_call(cmd.split())
+
+ cmd = ""
+ if self.use_ovs:
+ cmd += ('sudo ovs-vsctl add-port %(dif)s %(node)s.%('
+ 'ifname)s')
+ else:
+ cmd += ('sudo ip link set %(node)s.%(ifname)s master '
+ '%(dif)s')
+
+ cmd = (cmd % {'node': node.name,
+ 'ifname': ipcp.ifname,
+ 'dif': ipcp.dif.name})
+
+ subprocess.check_call(cmd.split())
+
+ cmd = ('sudo ip link set _%(node)s.%(ifname)s '
+ 'netns %(pid)i '
+ 'name %(ifname)s'
+ % {'node': node.name,
+ 'pid': pid,
+ 'ifname': ipcp.ifname})
+ subprocess.check_call(cmd.split())
+
+ cmd = ('sudo ip link set dev %(node)s.%(ifname)s up'
+ % {'node': node.name, 'ifname': ipcp.ifname})
+ subprocess.check_call(cmd.split())
+
+ cmd = ('sudo ip netns exec %(pid)i ip link set dev '
+ '%(ifname)s up'
+ % {'pid': pid, 'ifname': ipcp.ifname})
+ subprocess.check_call(cmd.split())
+
+ self.active_ipcps.add(ipcp)
+
+ logger.info("Experiment swapped in")
+
+ def swap_out(self, experiment):
+ for name, container in self.running_containers.items():
+ container.remove(force=True)
+
+ for shim in experiment.dif_ordering:
+ if isinstance(shim, mod.ShimEthDIF) and shim.name in self.active_bridges:
+ cmd = ""
+ if self.use_ovs:
+ cmd += 'sudo ovs-vsctl del-br %(shim)s'
+ else:
+ cmd += 'sudo ip link del %(shim)s'
+ cmd = cmd % {'shim': shim.name}
+
+ subprocess.check_call(cmd.split())
+
+ self.active_bridges.remove(shim.name)
+
+ for name, container in self.running_containers.items():
+ pid = container.attrs["State"]["Pid"]
+
+ cmd = 'sudo rm /var/run/netns/%(pid)i' % {'pid': pid}
+ subprocess.check_call(cmd.split())
+
+ self.running_containers = {}
+
+ logger.info("Experiment swapped out") \ No newline at end of file
diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py
index 4f8b023..06e0e5c 100644
--- a/rumba/testbeds/emulab.py
+++ b/rumba/testbeds/emulab.py
@@ -34,6 +34,8 @@ import rumba.ssh_support as ssh
import rumba.model as mod
import rumba.log as log
+from rumba.executors.ssh import SSHExecutor
+
logger = log.get_logger(__name__)
@@ -56,6 +58,8 @@ class Testbed(mod.Testbed):
self.ip = dict()
self.ops_ssh_config = mod.SSHConfig(self.ops_server())
+ self.executor = SSHExecutor
+
if "wall" in url:
self.http_proxy="https://proxy.atlantis.ugent.be:8080"
@@ -237,7 +241,7 @@ class Testbed(mod.Testbed):
node.ssh_config.set_password(self.password)
cmd = 'cat /var/emulab/boot/topomap'
- topomap = ssh.execute_command(self, experiment.nodes[0].ssh_config, cmd)
+ topomap = node.execute_command(cmd)
# Almost as ugly as yo momma
index = topomap.rfind("# lans")
topo_array = topomap[:index].split('\n')[1:-1]
@@ -258,7 +262,7 @@ class Testbed(mod.Testbed):
for node in experiment.nodes:
cmd = 'cat /var/emulab/boot/ifmap'
- output = ssh.execute_command(self, node.ssh_config, cmd)
+ output = node.execute_command(cmd)
output = re.split('\n', output)
for item in output:
item = item.split()
@@ -268,6 +272,8 @@ class Testbed(mod.Testbed):
ipcp.ifname = item[0]
def swap_in(self, experiment):
+ mod.Testbed.swap_in(self, experiment)
+
self._create_experiment(experiment)
wait = self.swap_exp_in()
if wait:
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py
index 3cfc82b..db9dd14 100644
--- a/rumba/testbeds/jfed.py
+++ b/rumba/testbeds/jfed.py
@@ -32,6 +32,8 @@ import time
import tarfile
import sys
+from rumba.executors.ssh import SSHExecutor
+
if sys.version_info[0] >= 3:
from urllib.request import urlretrieve
else:
@@ -65,6 +67,7 @@ class Testbed(mod.Testbed):
self.manifest = os.path.join(mod.tmp_dir, self.exp_name + ".rrspec")
self.jfed_jar = os.path.join(mod.cache_dir,
'jfed_cli/experimenter-cli.jar')
+ self.executor = SSHExecutor(self)
if "exogeni" in authority:
self.authority = "urn:publicid:IDN+" + authority + "+authority+am"
@@ -204,6 +207,8 @@ class Testbed(mod.Testbed):
raise
def swap_in(self, experiment):
+ mod.Testbed.swap_in(self, experiment)
+
for node in experiment.nodes:
node.ssh_config.set_http_proxy(self.http_proxy)
self.create_rspec(experiment)
diff --git a/rumba/testbeds/faketestbed.py b/rumba/testbeds/local.py
index b021aca..77aed82 100644
--- a/rumba/testbeds/faketestbed.py
+++ b/rumba/testbeds/local.py
@@ -27,17 +27,22 @@
import rumba.model as mod
import rumba.log as log
+from rumba.executors.local import LocalExecutor
logger = log.get_logger(__name__)
-# Fake testbed, useful for testing
+# Local testbed, useful for testing
class Testbed(mod.Testbed):
def __init__(self, exp_name, username, proj_name="ARCFIRE", password=""):
mod.Testbed.__init__(self, exp_name, username, password, proj_name)
+ self.executor = LocalExecutor(self)
+
def swap_in(self, experiment):
+ mod.Testbed.swap_in(self, experiment)
+
logger.info("Experiment swapped in")
def swap_out(self, experiment):
diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py
index 4321fc8..f3daefd 100644
--- a/rumba/testbeds/qemu.py
+++ b/rumba/testbeds/qemu.py
@@ -32,9 +32,10 @@ from subprocess import CalledProcessError
import rumba.model as mod
import rumba.log as log
-import rumba.ssh_support as ssh_support
import rumba.multiprocess as m_processing
+from rumba.executors.ssh import SSHExecutor
+
if sys.version_info[0] >= 3:
from urllib.request import urlretrieve
else:
@@ -59,6 +60,8 @@ class Testbed(mod.Testbed):
self.initramfs_path = initramfs_path
self.multiproc_manager = None
+ self.executor = SSHExecutor(self);
+
# Prepend sudo to all commands if the user is not 'root'
def may_sudo(self, cmds):
if os.geteuid() != 0:
@@ -116,13 +119,9 @@ class Testbed(mod.Testbed):
mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id)
logger.info('Recovering ifname for port: %s.',
port['tap_id'])
- output = ssh_support.execute_command(
- self,
- node.ssh_config,
- 'mac2ifname ' + mac)
+ output = node.execute_command('mac2ifname ' + mac)
ipcp.ifname = output
- ssh_support.execute_command(
- self, node.ssh_config,
+ node.execute_command(
"ip link set dev %(ifname)s up"
% {'ifname': ipcp.ifname})
@@ -131,6 +130,9 @@ class Testbed(mod.Testbed):
:type experiment mod.Experiment
:param experiment: The experiment running
"""
+
+ mod.Testbed.swap_in(self, experiment)
+
if os.geteuid() != 0:
try:
subprocess.check_call(["sudo", "-v"])
diff --git a/setup.py b/setup.py
index 99ff69c..79b5633 100755
--- a/setup.py
+++ b/setup.py
@@ -12,9 +12,10 @@ setuptools.setup(
author_email='sander.vrijders@ugent.be',
license='LGPL',
description='Rumba measurement framework for RINA',
- packages=['rumba', 'rumba.testbeds', 'rumba.prototypes'],
+ packages=['rumba', 'rumba.testbeds', 'rumba.prototypes', 'rumba.executors'],
install_requires=[
'paramiko',
+ 'docker',
'repoze.lru; python_version<"3.2"',
'contextlib2; python_version<"3.0"',
'enum34; python_version<"3.0"'
diff --git a/tools/democonf2rumba.py b/tools/democonf2rumba.py
index dc2f0a4..e398308 100755
--- a/tools/democonf2rumba.py
+++ b/tools/democonf2rumba.py
@@ -241,9 +241,9 @@ if __name__ == '__main__':
for a in qemu_p._actions
if a.dest != 'help'
and getattr(args, a.dest) is not None}
- elif args.testbed == 'fake':
- import rumba.testbeds.faketestbed as fake
- test_class = fake.Testbed
+ elif args.testbed == 'local':
+ import rumba.testbeds.local as local
+ test_class = local.Testbed
testbed_args = {a.dest: getattr(args, a.dest)
for a in fake_p._actions
if a.dest != 'help'