diff options
author | Nick Aerts <nick.aerts@ugent.be> | 2018-02-24 12:27:36 +0100 |
---|---|---|
committer | Nick Aerts <nick.aerts@ugent.be> | 2018-03-20 11:30:07 +0100 |
commit | a7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2 (patch) | |
tree | 9c29f08693d577c81959c3194aec04e9bc1ba195 | |
parent | 24bed306b5a67fc682b04cae017b0d0f3ac55a00 (diff) | |
download | rumba-a7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2.tar.gz rumba-a7cd88d752b72ea85ccefa5e1f3dceba13fb1fc2.zip |
testbeds: Add docker testbed
This adds support for a testbed based on Docker containers running on
the local host. Bridging the containers can be done using built-in
Linux bridging or using OpenVSwitch bridges.
A new resource 'executor' has been added to abstract away command
execution on nodes on the testbed. Executors have been created for
local command execution, docker exec based command execution and
SSH-based command execution. This has also been changed in the
prototypes to execute using the correct executor.
-rw-r--r-- | AUTHORS.txt | 1 | ||||
-rw-r--r-- | README.md | 14 | ||||
-rwxr-xr-x | examples/converged-operator-network.py | 2 | ||||
-rwxr-xr-x | examples/docker-ouroboros.py | 65 | ||||
-rwxr-xr-x | examples/example.py | 2 | ||||
-rwxr-xr-x | examples/mouse.py | 2 | ||||
-rwxr-xr-x | examples/snake.py | 2 | ||||
-rwxr-xr-x | examples/two-layers.py | 6 | ||||
-rwxr-xr-x | examples/vpn.py | 2 | ||||
-rw-r--r-- | rumba/executors/__init__.py | 26 | ||||
-rw-r--r-- | rumba/executors/docker.py | 80 | ||||
-rw-r--r-- | rumba/executors/local.py | 55 | ||||
-rw-r--r-- | rumba/executors/ssh.py | 64 | ||||
-rw-r--r-- | rumba/model.py | 130 | ||||
-rw-r--r-- | rumba/prototypes/ouroboros.py | 49 | ||||
-rw-r--r-- | rumba/ssh_support.py | 4 | ||||
-rw-r--r-- | rumba/testbeds/dockertb.py | 190 | ||||
-rw-r--r-- | rumba/testbeds/emulab.py | 10 | ||||
-rw-r--r-- | rumba/testbeds/jfed.py | 5 | ||||
-rw-r--r-- | rumba/testbeds/local.py (renamed from rumba/testbeds/faketestbed.py) | 7 | ||||
-rw-r--r-- | rumba/testbeds/qemu.py | 16 | ||||
-rwxr-xr-x | setup.py | 3 | ||||
-rwxr-xr-x | tools/democonf2rumba.py | 6 |
23 files changed, 612 insertions, 129 deletions
diff --git a/AUTHORS.txt b/AUTHORS.txt index 55a1c91..0e929fb 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -2,3 +2,4 @@ Sander Vrijders <sander.vrijders@intec.ugent.be> Dimitri Staessens <dimitri.staessens@ugent.be> Vincenzo Maffione <v.maffione@nextworks.it> Marco Capitani <m.capitani@nextworks.it> +Nick Aerts <nick.aerts@ugent.be>
\ No newline at end of file @@ -160,6 +160,20 @@ Example scripts can be found in the examples/ folder. for the previous commands, without changing the user (e.g. using su or sudo). + * [Docker](https://www.docker.com/) is a container runtime environment. + + To use the Docker testbed the Docker software needs to be installed, see + [Install Docker](https://docs.docker.com/install/) and complete + [Post-installation steps for Linux](https://docs.docker + .com/install/linux/linux-postinstall/) + + To use a Docker testbed you can for example use: + + tb = docker.Testbed(exp_name = "ouroboros", + base_image = "ouroborosrumba/prototype") + + This will pull the latest ouroboros image from Docker Hub. + ## Accessing nodes after swap-in To access a node once the experiment swapped in, use the following diff --git a/examples/converged-operator-network.py b/examples/converged-operator-network.py index 3b80746..8ed79ed 100755 --- a/examples/converged-operator-network.py +++ b/examples/converged-operator-network.py @@ -10,7 +10,7 @@ from rumba.utils import * # import testbed plugins import rumba.testbeds.emulab as emulab import rumba.testbeds.jfed as jfed -import rumba.testbeds.faketestbed as fake +import rumba.testbeds.local as local import rumba.testbeds.qemu as qemu # import prototype plugins diff --git a/examples/docker-ouroboros.py b/examples/docker-ouroboros.py new file mode 100755 index 0000000..1c6009c --- /dev/null +++ b/examples/docker-ouroboros.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python + +# An example script using the rumba package + +from rumba.model import * +from rumba.utils import ExperimentManager + +# import testbed plugins +import rumba.testbeds.dockertb as docker + +# import prototype plugins +import rumba.prototypes.ouroboros as our + +import argparse +import sys + + +description = "Script to create an ouroboros" + +argparser = argparse.ArgumentParser(description = description) +argparser.add_argument('-n', '--nodes', type = int, default = '10', + help = "Total number of nodes") + +args = argparser.parse_args() + +log.set_logging_level('DEBUG') + +n01 = NormalDIF("n01") + +if (args.nodes < 3): + print("The ouroboros must be longer than 2 nodes") + sys.exit(-1) + +nodes = [] + +shim_prev = None +for i in range(0, args.nodes): + shim = ShimEthDIF("e" + str(i)) + + if shim_prev == None and shim != None: + node = Node("node" + str(i), difs = [n01, shim], + dif_registrations = {n01 : [shim]}) + elif shim_prev != None and shim != None: + node = Node("node" + str(i), difs = [n01, shim, shim_prev], + dif_registrations = {n01 : [shim, shim_prev]}) + else: + node = Node("node" + str(i), difs = [n01, shim_prev], + dif_registrations = {n01 : [shim_prev]}) + + shim_prev = shim + nodes.append(node) + +nodes[0].add_dif(shim_prev) +nodes[0].add_dif_registration(n01, shim_prev) + +tb = docker.Testbed(exp_name = "ouroboros", + base_image = "ouroborosrumba/prototype") + +exp = our.Experiment(tb, nodes = nodes) + +print(exp) + +with ExperimentManager(exp): + exp.swap_in() + exp.bootstrap_prototype() diff --git a/examples/example.py b/examples/example.py index eee966b..62d5a53 100755 --- a/examples/example.py +++ b/examples/example.py @@ -9,7 +9,7 @@ from rumba.storyboard import * # import testbed plugins import rumba.testbeds.emulab as emulab import rumba.testbeds.jfed as jfed -import rumba.testbeds.faketestbed as fake +import rumba.testbeds.local as local import rumba.testbeds.qemu as qemu # import prototype plugins diff --git a/examples/mouse.py b/examples/mouse.py index 026b3bc..16fdf49 100755 --- a/examples/mouse.py +++ b/examples/mouse.py @@ -8,7 +8,7 @@ from rumba.utils import ExperimentManager # import testbed plugins import rumba.testbeds.emulab as emulab import rumba.testbeds.jfed as jfed -import rumba.testbeds.faketestbed as fake +import rumba.testbeds.local as local import rumba.testbeds.qemu as qemu # import prototype plugins diff --git a/examples/snake.py b/examples/snake.py index 6f9c48d..3d76797 100755 --- a/examples/snake.py +++ b/examples/snake.py @@ -8,7 +8,7 @@ from rumba.utils import ExperimentManager # import testbed plugins import rumba.testbeds.emulab as emulab import rumba.testbeds.jfed as jfed -import rumba.testbeds.faketestbed as fake +import rumba.testbeds.local as local import rumba.testbeds.qemu as qemu # import prototype plugins diff --git a/examples/two-layers.py b/examples/two-layers.py index c375088..3f50037 100755 --- a/examples/two-layers.py +++ b/examples/two-layers.py @@ -8,7 +8,7 @@ from rumba.storyboard import * # import testbed plugins import rumba.testbeds.emulab as emulab import rumba.testbeds.jfed as jfed -import rumba.testbeds.faketestbed as fake +import rumba.testbeds.local as local import rumba.testbeds.qemu as qemu # import prototype plugins @@ -59,6 +59,6 @@ with ExperimentManager(exp): exp.swap_in() exp.bootstrap_prototype() sb = StoryBoard(experiment=exp, duration=15, servers=[]) - sb.run_command(7.5, a, 'echo "7.5 secs in. We are at $(hostname)"') - sb.run_command(12, b, 'echo "12 secs in. We are at $(hostname)"') + sb.schedule_command(7.5, a, 'echo "7.5 secs in. We are at $(hostname)"') + sb.schedule_command(12, b, 'echo "12 secs in. We are at $(hostname)"') sb.start() diff --git a/examples/vpn.py b/examples/vpn.py index 28de09e..6aa8db6 100755 --- a/examples/vpn.py +++ b/examples/vpn.py @@ -8,7 +8,7 @@ from rumba.utils import ExperimentManager # import testbed plugins import rumba.testbeds.emulab as emulab import rumba.testbeds.jfed as jfed -import rumba.testbeds.faketestbed as fake +import rumba.testbeds.local as local import rumba.testbeds.qemu as qemu # import prototype plugins diff --git a/rumba/executors/__init__.py b/rumba/executors/__init__.py new file mode 100644 index 0000000..58fcfb5 --- /dev/null +++ b/rumba/executors/__init__.py @@ -0,0 +1,26 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders <sander.vrijders@ugent.be> +# Dimitri Staessens <dimitri.staessens@ugent.be> +# Vincenzo Maffione <v.maffione@nextworks.it> +# Marco Capitani <m.capitani@nextworks.it> +# Nick Aerts <nick.aerts@ugent.be> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# diff --git a/rumba/executors/docker.py b/rumba/executors/docker.py new file mode 100644 index 0000000..e1e1480 --- /dev/null +++ b/rumba/executors/docker.py @@ -0,0 +1,80 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders <sander.vrijders@ugent.be> +# Dimitri Staessens <dimitri.staessens@ugent.be> +# Vincenzo Maffione <v.maffione@nextworks.it> +# Marco Capitani <m.capitani@nextworks.it> +# Nick Aerts <nick.aerts@ugent.be> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +from rumba import model as mod + +import tempfile +import tarfile + +from rumba import log + +logger = log.get_logger(__name__) + + +class DockerException(Exception): + pass + + +class DockerExecutor(mod.Executor): + def __init__(self, testbed): + self.testbed = testbed + self.running_containers = testbed.running_containers + + def execute_command(self, node, command, sudo=False, time_out=3): + logger.debug("%s >> %s" % (node.name, command)) + + c, o = self.running_containers[node.name].exec_run(["sh", "-c", + command]) + if c: + raise DockerException('A remote command returned an error. ' + 'Output:\n\n\t' + o.decode("utf-8")) + + return o.decode("utf-8") + + def fetch_file(self, node, path, destination, as_root=False): + if not path.startswith("/"): + workingdir = self.running_containers[node.name].attrs["Config"][ + "WorkingDir"] + path = os.path.join(workingdir, path) + + try: + with tempfile.NamedTemporaryFile() as tmp: + archive, _ = self.running_containers[node.name].get_archive( + path) + + for c in archive: + tmp.write(c) + + tmp.seek(0) + + tarfile.TarFile(fileobj=tmp, mode='r').extract( + path.basename(path), destination) + except: + logger.error("Error when extracting %s" % path) + + def copy_file(self, node, path, destination): + pass diff --git a/rumba/executors/local.py b/rumba/executors/local.py new file mode 100644 index 0000000..9f3f484 --- /dev/null +++ b/rumba/executors/local.py @@ -0,0 +1,55 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders <sander.vrijders@ugent.be> +# Dimitri Staessens <dimitri.staessens@ugent.be> +# Vincenzo Maffione <v.maffione@nextworks.it> +# Marco Capitani <m.capitani@nextworks.it> +# Nick Aerts <nick.aerts@ugent.be> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +from rumba import model as mod + +import subprocess + +from rumba import log + +logger = log.get_logger(__name__) + + +class LocalExecutor(mod.Executor): + def __init__(self, testbed): + self.testbed = testbed + + def execute_command(self, node, cmd, sudo=False, time_out=3): + try: + logger.debug("%s >> %s" % (node.name, cmd)) + subprocess.check_call(cmd.split(' ')) + except subprocess.CalledProcessError as e: + logger.error("Return code was " + str(e.returncode)) + raise + + def fetch_file(self, node, path, destination, as_root=False): + logger.error("Fetch_file not supported for local testbed") + raise + + def copy_file(self, node, path, destination): + logger.error("Copy_file not supported for local testbed") + raise diff --git a/rumba/executors/ssh.py b/rumba/executors/ssh.py new file mode 100644 index 0000000..6a3a41d --- /dev/null +++ b/rumba/executors/ssh.py @@ -0,0 +1,64 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders <sander.vrijders@ugent.be> +# Dimitri Staessens <dimitri.staessens@ugent.be> +# Vincenzo Maffione <v.maffione@nextworks.it> +# Marco Capitani <m.capitani@nextworks.it> +# Nick Aerts <nick.aerts@ugent.be> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +from rumba.model import Executor +from rumba.ssh_support import execute_command, execute_commands, \ + copy_file_to_testbed, copy_file_from_testbed, execute_proxy_command, \ + execute_proxy_commands + + +class SSHExecutor(Executor): + def __init__(self, testbed, use_proxy=False): + self.testbed = testbed + self.use_proxy = use_proxy + + def execute_command(self, node, command, as_root=False, time_out=3): + if as_root: + if node.ssh_config.username != 'root': + command = "sudo %s" % command + + if self.use_proxy: + return execute_proxy_command(self, node.ssh_config, command, + time_out) + else: + return execute_command(self.testbed, node.ssh_config, command, + time_out) + + def execute_commands(self, node, commands, as_root=False, time_out=3): + if self.use_proxy: + return execute_proxy_commands(self, node.ssh_config, commands, + time_out) + else: + return execute_commands(self.testbed, node.ssh_config, commands, + time_out) + + def fetch_file(self, node, path, destination, sudo): + copy_file_from_testbed(self.testbed, node.ssh_config, path, + destination, sudo) + + def copy_file(self, node, path, destination): + copy_file_to_testbed(self.testbed, node.ssh_config, path, destination)
\ No newline at end of file diff --git a/rumba/model.py b/rumba/model.py index 39528be..20f8215 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -31,7 +31,6 @@ import time import shutil import rumba.log as log -from rumba import ssh_support logger = log.get_logger(__name__) @@ -88,7 +87,8 @@ class Testbed(object): @abc.abstractmethod def swap_in(self, experiment): - raise Exception('swap_in() not implemented') + for node in experiment.nodes: + node.executor = self.executor @abc.abstractmethod def swap_out(self, experiment): @@ -245,6 +245,9 @@ class Node(object): if hasattr(dif, 'policy'): # check if the dif supports policies self.policies[dif] = policies.get(dif, Policy(dif, self)) + self.executor = None # will be set by testbed on swap_in + self.startup_command = None # will be set by prototype + self._validate() def get_ipcp_by_dif(self, dif): @@ -328,88 +331,34 @@ class Node(object): def get_policy(self, dif): return self.policies[dif] - def execute_commands(self, commands, time_out=3, use_proxy=False): - # Ssh_config is used twice since it doubles as testbed info - # (it holds fields username and password) - if use_proxy: - return ssh_support.execute_proxy_commands( - self.ssh_config, - self.ssh_config, - commands, - time_out - ) - # else: - return ssh_support.execute_commands( - self.ssh_config, - self.ssh_config, - commands, - time_out - ) - - def execute_command(self, command, time_out=3, - use_proxy=False, as_root=False): - # Ssh_config is used twice since it doubles as testbed info - # (it holds fields username and password) - if as_root: - if self.ssh_config.username != 'root': - command = "sudo %s" % command - - if use_proxy: - return ssh_support.execute_proxy_command( - self.ssh_config, - self.ssh_config, - command, - time_out - ) - # else: - return ssh_support.execute_command( - self.ssh_config, - self.ssh_config, - command, - time_out - ) + def execute_commands(self, commands, as_root=False, time_out=3, + use_proxy=False): + return self.executor.execute_commands(self, commands, as_root, time_out) + + def execute_command(self, command, as_root=False, time_out=3, + use_proxy=False): + return self.executor.execute_command(self, command, as_root, time_out) def write_text_to_file(self, text, file_name): - ssh_support.write_text_to_file( - self.ssh_config, - self.ssh_config, - text, - file_name - ) + # ssh_support.write_text_to_file( + # self.ssh_config, + # self.ssh_config, + # text, + # file_name + # ) + return def copy_file(self, path, destination): - ssh_support.copy_file_to_testbed( - self.ssh_config, - self.ssh_config, - path, - destination - ) + self.executor.copy_file(self, path, destination) def copy_files(self, paths, destination): - ssh_support.copy_files_to_testbed( - self.ssh_config, - self.ssh_config, - paths, - destination - ) + self.executor.copy_files(self, paths, destination) def fetch_files(self, paths, destination, sudo=False): - ssh_support.copy_files_from_testbed( - self.ssh_config, - self.ssh_config, - paths, - destination, - sudo=sudo - ) + self.executor.fetch_files(self, paths, destination, sudo) def fetch_file(self, path, destination, sudo=False): - ssh_support.copy_file_from_testbed( - self.ssh_config, - self.ssh_config, - path, - destination, - sudo=sudo - ) + self.executor.fetch_files(self, path, destination, sudo) def set_link_state(self, ipcp, state): self.execute_command('ip link set dev ' + ipcp.ifname + ' ' + state, @@ -869,6 +818,10 @@ class Experiment(object): end = time.time() logger.info("Install took %.2f seconds", end - start) + def set_startup_command(self, command): + for node in self.nodes: + node.startup_command = command + def bootstrap_prototype(self): start = time.time() self._bootstrap_prototype() @@ -912,3 +865,32 @@ class Experiment(object): self.testbed.swap_out(self) end = time.time() logger.info("Swap-out took %.2f seconds", end - start) + + +class Executor: + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def execute_command(self, node, command, as_root=False, time_out=3): + # Execute command on a node + return + + def execute_commands(self, node, commands, as_root=False, time_out=3): + for command in commands: + self.execute_command(node, command, as_root, time_out) + + @abc.abstractmethod + def copy_file(self, node, path, destination): + return + + def copy_files(self, node, paths, destination): + for path in paths: + self.copy_file(node, path, destination) + + @abc.abstractmethod + def fetch_file(self, node, path, destination, sudo=False): + return + + def fetch_files(self, node, paths, destination, sudo=False): + for path in paths: + self.fetch_file(node, path, destination, sudo)
\ No newline at end of file diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py index 33ecd0b..97f6e1a 100644 --- a/rumba/prototypes/ouroboros.py +++ b/rumba/prototypes/ouroboros.py @@ -31,7 +31,8 @@ import rumba.ssh_support as ssh import rumba.model as mod import rumba.multiprocess as m_processing import rumba.log as log -import rumba.testbeds.faketestbed as fake +import rumba.testbeds.local as local +import rumba.testbeds.dockertb as docker logger = log.get_logger(__name__) @@ -45,6 +46,8 @@ class Experiment(mod.Experiment): mod.Experiment.__init__(self, testbed, nodes, git_repo, git_branch) self.r_ipcps = dict() + self.set_startup_command("irmd") + @staticmethod def make_executor(node, packages, testbed): def executor(commands): @@ -68,19 +71,20 @@ class Experiment(mod.Experiment): self.exec_local_cmd(cmd) def setup_ouroboros(self): - if isinstance(self.testbed, fake.Testbed): + if isinstance(self.testbed, docker.Testbed): + return + + if isinstance(self.testbed, local.Testbed): subprocess.check_call('sudo -v'.split()) self.irmd = subprocess.Popen(["sudo", "irmd"]) logger.info("Started IRMd, sleeping 2 seconds...") time.sleep(2) else: for node in self.nodes: - ssh.execute_command(self.testbed, node.ssh_config, - "sudo nohup irmd > /dev/null &", - time_out=None) + node.execute_command("sudo nohup irmd > /dev/null &", time_out=None) def install_ouroboros(self): - if isinstance(self.testbed, fake.Testbed): + if isinstance(self.testbed, local.Testbed): return packages = ["cmake", "protobuf-c-compiler", "git", "libfuse-dev", @@ -120,7 +124,7 @@ class Experiment(mod.Experiment): cmd = "irm i c n " + ipcp.name if isinstance(ipcp.dif, mod.ShimEthDIF): - if isinstance(self.testbed, fake.Testbed): + if isinstance(self.testbed, local.Testbed): cmd += " type local layer " + ipcp.dif.name else: cmd += " type eth-llc dev " + ipcp.ifname @@ -154,11 +158,7 @@ class Experiment(mod.Experiment): # Postpone registrations self.r_ipcps[ipcp] = cmds2 - if isinstance(self.testbed, fake.Testbed): - self.exec_local_cmds(cmds) - else: - ssh.execute_commands(self.testbed, node.ssh_config, cmds, - time_out=None) + node.execute_commands(cmds, time_out=None) def enroll_dif(self, el): for e in el: @@ -167,13 +167,8 @@ class Experiment(mod.Experiment): # Execute postponed registration if e['enroller'] in self.r_ipcps: - if isinstance(self.testbed, fake.Testbed): - self.exec_local_cmds(self.r_ipcps[e['enroller']]) - else: - ssh.execute_commands(self.testbed, - e['enroller'].node.ssh_config, - self.r_ipcps[e['enroller']], - time_out=None) + e['enroller'].node.execute_commands(self.r_ipcps[e['enroller']], + time_out=None) self.r_ipcps.pop(e['enroller'], None) cmd = "irm r n " + ipcp.name @@ -188,12 +183,7 @@ class Experiment(mod.Experiment): cmd += " layer " + dif_b.name cmds.append(cmd) - if isinstance(self.testbed, fake.Testbed): - self.exec_local_cmds(cmds) - else: - ssh.execute_commands(self.testbed, - e['enrollee'].node.ssh_config, - cmds, time_out=None) + e['enrollee'].node.execute_commands(cmds, time_out=None) def setup_flows(self, el, comp): for e in el: @@ -201,12 +191,7 @@ class Experiment(mod.Experiment): cmd = "irm i conn n " + ipcp.name + " comp " + \ comp + " dst " + e['dst'].name - if isinstance(self.testbed, fake.Testbed): - self.exec_local_cmd(cmd) - else: - ssh.execute_command(self.testbed, - ipcp.node.ssh_config, - cmd, time_out=None) + ipcp.node.execute_command(cmd, time_out=None) def _install_prototype(self): logger.info("Installing Ouroboros...") @@ -230,6 +215,6 @@ class Experiment(mod.Experiment): logger.info("All done, have fun!") def _terminate_prototype(self): - if isinstance(self.testbed, fake.Testbed): + if isinstance(self.testbed, local.Testbed): logger.info("Killing IRMd...") subprocess.check_call('sudo killall -15 irmd'.split()) diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py index 88334ea..b1492e7 100644 --- a/rumba/ssh_support.py +++ b/rumba/ssh_support.py @@ -32,6 +32,8 @@ import time import rumba.log as log # Fix input reordering +from rumba.model import Executor + try: import builtins # Only in Python 3 @@ -434,4 +436,4 @@ def aptitude_install(testbed, node, packages): "while ! " + sudo("apt-get update") + "; do sleep 1; done", "while ! " + sudo(package_install) + "; do sleep 1; done"] - execute_proxy_commands(testbed, node.ssh_config, cmds, time_out=None) + execute_proxy_commands(testbed, node.ssh_config, cmds, time_out=None)
\ No newline at end of file diff --git a/rumba/testbeds/dockertb.py b/rumba/testbeds/dockertb.py new file mode 100644 index 0000000..3895a7e --- /dev/null +++ b/rumba/testbeds/dockertb.py @@ -0,0 +1,190 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders <sander.vrijders@ugent.be> +# Dimitri Staessens <dimitri.staessens@ugent.be> +# Vincenzo Maffione <v.maffione@nextworks.it> +# Marco Capitani <m.capitani@nextworks.it> +# Nick Aerts <nick.aerts@ugent.be> +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# +import os +from time import sleep +import docker +import subprocess + +import rumba.model as mod +import rumba.log as log +from rumba.executors.docker import DockerExecutor + + +logger = log.get_logger(__name__) + +class Testbed(mod.Testbed): + def __init__(self, exp_name, base_image, pull_image=True, + use_ovs=False): + mod.Testbed.__init__(self, exp_name, "", "", "") + + img = base_image.rsplit(":", 1) + + self.base_image_repo = img[0] + self.base_image_tag = "latest" if len(img) == 1 else img[1] + self.base_image = "%s:%s" % (self.base_image_repo, self.base_image_tag) + self.pull_image = pull_image + self.use_ovs = use_ovs + + self.running_containers = {} + self.active_bridges = set() + self.active_ipcps = set() + + self.docker_client = docker.from_env() + self.executor = DockerExecutor(self) + + def swap_in(self, experiment): + docker_client = self.docker_client + + mod.Testbed.swap_in(self, experiment) + + # Pull image + if self.pull_image: + docker_client.images.pull(self.base_image_repo, + self.base_image_tag) + + docker_client.images.get("%s:%s" % (self.base_image_repo, + self.base_image_tag)) + + # Start all nodes + for node in experiment.nodes: + self.running_containers[node.name] = docker_client.containers.run( + self.base_image, command=node.startup_command, name=node.name, + detach=True, network="none", privileged=True, + devices=["/dev/fuse"]) + + logger.info("Nodes starting") + + if not os.path.exists("/var/run/netns"): + subprocess.check_call('sudo mkdir /var/run/netns'.split()) + + for shim in experiment.dif_ordering: + if not isinstance(shim, mod.ShimEthDIF): + # Nothing to do here + continue + + cmd = "" + if self.use_ovs: + cmd += 'sudo ovs-vsctl add-br %(shim)s' + else: + cmd += 'sudo ip link add %(shim)s type bridge' + cmd = cmd % {'shim': shim.name} + + self.active_bridges.add(shim.name) + + subprocess.check_call(cmd.split()) + + if not self.use_ovs: + cmd = 'sudo ip link set dev %(shim)s up' % {'shim': shim.name} + subprocess.check_call(cmd.split()) + for node in experiment.nodes: + container = self.running_containers[node.name] + + container.reload() + + state = container.attrs["State"] + + while not state["Running"]: + sleep(0.2) + container.reload() + state = container.attrs["State"] + + pid = state["Pid"] + + subprocess.check_call(('sudo ln -s /proc/%(pid)i/ns/net ' + '/var/run/netns/%(pid)i' % {'pid': pid}).split()) + + for ipcp in node.ipcps: + if isinstance(ipcp, mod.ShimEthIPCP): + if ipcp.ifname is None: + ipcp.ifname = "eth%i" % node.ipcps.index(ipcp) + + cmd = ('sudo ip link add %(node)s.%(ifname)s type veth ' + 'peer name _%(node)s.%(ifname)s'\ + % {'node': node.name, 'ifname': ipcp.ifname}) + subprocess.check_call(cmd.split()) + + cmd = "" + if self.use_ovs: + cmd += ('sudo ovs-vsctl add-port %(dif)s %(node)s.%(' + 'ifname)s') + else: + cmd += ('sudo ip link set %(node)s.%(ifname)s master ' + '%(dif)s') + + cmd = (cmd % {'node': node.name, + 'ifname': ipcp.ifname, + 'dif': ipcp.dif.name}) + + subprocess.check_call(cmd.split()) + + cmd = ('sudo ip link set _%(node)s.%(ifname)s ' + 'netns %(pid)i ' + 'name %(ifname)s' + % {'node': node.name, + 'pid': pid, + 'ifname': ipcp.ifname}) + subprocess.check_call(cmd.split()) + + cmd = ('sudo ip link set dev %(node)s.%(ifname)s up' + % {'node': node.name, 'ifname': ipcp.ifname}) + subprocess.check_call(cmd.split()) + + cmd = ('sudo ip netns exec %(pid)i ip link set dev ' + '%(ifname)s up' + % {'pid': pid, 'ifname': ipcp.ifname}) + subprocess.check_call(cmd.split()) + + self.active_ipcps.add(ipcp) + + logger.info("Experiment swapped in") + + def swap_out(self, experiment): + for name, container in self.running_containers.items(): + container.remove(force=True) + + for shim in experiment.dif_ordering: + if isinstance(shim, mod.ShimEthDIF) and shim.name in self.active_bridges: + cmd = "" + if self.use_ovs: + cmd += 'sudo ovs-vsctl del-br %(shim)s' + else: + cmd += 'sudo ip link del %(shim)s' + cmd = cmd % {'shim': shim.name} + + subprocess.check_call(cmd.split()) + + self.active_bridges.remove(shim.name) + + for name, container in self.running_containers.items(): + pid = container.attrs["State"]["Pid"] + + cmd = 'sudo rm /var/run/netns/%(pid)i' % {'pid': pid} + subprocess.check_call(cmd.split()) + + self.running_containers = {} + + logger.info("Experiment swapped out")
\ No newline at end of file diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py index 4f8b023..06e0e5c 100644 --- a/rumba/testbeds/emulab.py +++ b/rumba/testbeds/emulab.py @@ -34,6 +34,8 @@ import rumba.ssh_support as ssh import rumba.model as mod import rumba.log as log +from rumba.executors.ssh import SSHExecutor + logger = log.get_logger(__name__) @@ -56,6 +58,8 @@ class Testbed(mod.Testbed): self.ip = dict() self.ops_ssh_config = mod.SSHConfig(self.ops_server()) + self.executor = SSHExecutor + if "wall" in url: self.http_proxy="https://proxy.atlantis.ugent.be:8080" @@ -237,7 +241,7 @@ class Testbed(mod.Testbed): node.ssh_config.set_password(self.password) cmd = 'cat /var/emulab/boot/topomap' - topomap = ssh.execute_command(self, experiment.nodes[0].ssh_config, cmd) + topomap = node.execute_command(cmd) # Almost as ugly as yo momma index = topomap.rfind("# lans") topo_array = topomap[:index].split('\n')[1:-1] @@ -258,7 +262,7 @@ class Testbed(mod.Testbed): for node in experiment.nodes: cmd = 'cat /var/emulab/boot/ifmap' - output = ssh.execute_command(self, node.ssh_config, cmd) + output = node.execute_command(cmd) output = re.split('\n', output) for item in output: item = item.split() @@ -268,6 +272,8 @@ class Testbed(mod.Testbed): ipcp.ifname = item[0] def swap_in(self, experiment): + mod.Testbed.swap_in(self, experiment) + self._create_experiment(experiment) wait = self.swap_exp_in() if wait: diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index 3cfc82b..db9dd14 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -32,6 +32,8 @@ import time import tarfile import sys +from rumba.executors.ssh import SSHExecutor + if sys.version_info[0] >= 3: from urllib.request import urlretrieve else: @@ -65,6 +67,7 @@ class Testbed(mod.Testbed): self.manifest = os.path.join(mod.tmp_dir, self.exp_name + ".rrspec") self.jfed_jar = os.path.join(mod.cache_dir, 'jfed_cli/experimenter-cli.jar') + self.executor = SSHExecutor(self) if "exogeni" in authority: self.authority = "urn:publicid:IDN+" + authority + "+authority+am" @@ -204,6 +207,8 @@ class Testbed(mod.Testbed): raise def swap_in(self, experiment): + mod.Testbed.swap_in(self, experiment) + for node in experiment.nodes: node.ssh_config.set_http_proxy(self.http_proxy) self.create_rspec(experiment) diff --git a/rumba/testbeds/faketestbed.py b/rumba/testbeds/local.py index b021aca..77aed82 100644 --- a/rumba/testbeds/faketestbed.py +++ b/rumba/testbeds/local.py @@ -27,17 +27,22 @@ import rumba.model as mod import rumba.log as log +from rumba.executors.local import LocalExecutor logger = log.get_logger(__name__) -# Fake testbed, useful for testing +# Local testbed, useful for testing class Testbed(mod.Testbed): def __init__(self, exp_name, username, proj_name="ARCFIRE", password=""): mod.Testbed.__init__(self, exp_name, username, password, proj_name) + self.executor = LocalExecutor(self) + def swap_in(self, experiment): + mod.Testbed.swap_in(self, experiment) + logger.info("Experiment swapped in") def swap_out(self, experiment): diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index 4321fc8..f3daefd 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -32,9 +32,10 @@ from subprocess import CalledProcessError import rumba.model as mod import rumba.log as log -import rumba.ssh_support as ssh_support import rumba.multiprocess as m_processing +from rumba.executors.ssh import SSHExecutor + if sys.version_info[0] >= 3: from urllib.request import urlretrieve else: @@ -59,6 +60,8 @@ class Testbed(mod.Testbed): self.initramfs_path = initramfs_path self.multiproc_manager = None + self.executor = SSHExecutor(self); + # Prepend sudo to all commands if the user is not 'root' def may_sudo(self, cmds): if os.geteuid() != 0: @@ -116,13 +119,9 @@ class Testbed(mod.Testbed): mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id) logger.info('Recovering ifname for port: %s.', port['tap_id']) - output = ssh_support.execute_command( - self, - node.ssh_config, - 'mac2ifname ' + mac) + output = node.execute_command('mac2ifname ' + mac) ipcp.ifname = output - ssh_support.execute_command( - self, node.ssh_config, + node.execute_command( "ip link set dev %(ifname)s up" % {'ifname': ipcp.ifname}) @@ -131,6 +130,9 @@ class Testbed(mod.Testbed): :type experiment mod.Experiment :param experiment: The experiment running """ + + mod.Testbed.swap_in(self, experiment) + if os.geteuid() != 0: try: subprocess.check_call(["sudo", "-v"]) @@ -12,9 +12,10 @@ setuptools.setup( author_email='sander.vrijders@ugent.be', license='LGPL', description='Rumba measurement framework for RINA', - packages=['rumba', 'rumba.testbeds', 'rumba.prototypes'], + packages=['rumba', 'rumba.testbeds', 'rumba.prototypes', 'rumba.executors'], install_requires=[ 'paramiko', + 'docker', 'repoze.lru; python_version<"3.2"', 'contextlib2; python_version<"3.0"', 'enum34; python_version<"3.0"' diff --git a/tools/democonf2rumba.py b/tools/democonf2rumba.py index dc2f0a4..e398308 100755 --- a/tools/democonf2rumba.py +++ b/tools/democonf2rumba.py @@ -241,9 +241,9 @@ if __name__ == '__main__': for a in qemu_p._actions if a.dest != 'help' and getattr(args, a.dest) is not None} - elif args.testbed == 'fake': - import rumba.testbeds.faketestbed as fake - test_class = fake.Testbed + elif args.testbed == 'local': + import rumba.testbeds.local as local + test_class = local.Testbed testbed_args = {a.dest: getattr(args, a.dest) for a in fake_p._actions if a.dest != 'help' |