diff options
Diffstat (limited to 'rumba/testbeds')
-rw-r--r-- | rumba/testbeds/jfed.py | 5 | ||||
-rw-r--r-- | rumba/testbeds/qemu.py | 160 |
2 files changed, 121 insertions, 44 deletions
diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index 33d89f8..8ad173b 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -128,6 +128,11 @@ class Testbed(mod.Testbed): auth_name_r = self.auth_name.replace(".", "-") node.ssh_config.hostname = node.name + "." + self.exp_name + "." + \ auth_name_r + "." + self.auth_name + node.ssh_config.proxycommand = "ssh -i '" + self.cert_file + \ + "' -o StrictHostKeyChecking=no " + \ + self.username + \ + "@bastion.test.iminds.be nc " + \ + node.ssh_config.hostname + " 22" subprocess.call(["java", "-jar", self.jfed_jar, "create", "-S", \ self.proj_name, "--rspec", \ diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index 3573554..d998625 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -2,6 +2,7 @@ # QEMU testbed for Rumba # # Vincenzo Maffione <v.maffione@nextworks.it> +# Marco Capitani <m.capitani@nextworks.it> # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public @@ -23,10 +24,12 @@ import subprocess import os import rumba.model as mod +from rumba import ssh_support class Testbed(mod.Testbed): - def __init__(self, exp_name, username, bzimage, initramfs, proj_name="ARCFIRE", password="", + def __init__(self, exp_name, bzimage, initramfs, proj_name="ARCFIRE", + password="root", username="root", use_vhost=True, qemu_logs_dir=None): mod.Testbed.__init__(self, exp_name, username, password, proj_name) self.vms = {} @@ -34,15 +37,18 @@ class Testbed(mod.Testbed): self.bzimage = bzimage self.initramfs = initramfs self.vhost = use_vhost - self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None else qemu_logs_dir + self.qemu_logs_dir = os.getcwd() if qemu_logs_dir is None \ + else qemu_logs_dir self.boot_processes = [] @staticmethod - def _run_command_chain(commands, results_queue, error_queue, ignore_errors=False): + def _run_command_chain(commands, results_queue, + error_queue, ignore_errors=False): """ Runs (sequentially) the command list. - On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty. + On error, breaks and dumps it in error_queue, and interrupts + as soon as it is non-empty (unless ignore errors is True). :type commands: list :type results_queue: Queue @@ -56,7 +62,7 @@ class Testbed(mod.Testbed): for command in commands: if not error_queue.empty() and not ignore_errors: break - print('DEBUG: executing >> %s' % command) + print('qemu: executing >> %s' % command) try: subprocess.check_call(command.split()) except subprocess.CalledProcessError as e: @@ -69,6 +75,37 @@ class Testbed(mod.Testbed): else: results_queue.put("Command chain ran with %d errors" % errors) + def recover_if_names(self, experiment): + next_vlan = 10 + assigned_vlan = {} + for node in experiment.nodes: + for ipcp in node.ipcps: + if isinstance(ipcp, mod.ShimEthIPCP): + shim_name, node_name = ipcp.name.split('.') + port_set = [x for x in self.vms[node_name]['ports'] + if x['shim'].name == shim_name] + port = port_set[0] + port_id = port['port_id'] + vm_id = self.vms[node_name]['id'] + mac = '00:0a:0a:0a:%02x:%02x' % (vm_id, port_id) + print('qemu: recovering ifname for port: ' + + port['tap_id'] + '.') + output = ssh_support.execute_command( + self, + node.ssh_config, + 'mac2ifname ' + mac) + ipcp.ifname = output + try: + vlan = int(port['shim'].name) + except ValueError: + vlan = assigned_vlan.get(port['shim'].name, None) + if vlan is None: + vlan = next_vlan + next_vlan += 10 + assigned_vlan[port['shim'].name] = vlan + ssh_support.setup_vlan(self, node, + vlan, ipcp.ifname) + def swap_in(self, experiment): """ :type experiment mod.Experiment @@ -80,19 +117,19 @@ class Testbed(mod.Testbed): except subprocess.CalledProcessError: raise Exception('Not authenticated') - print("[QEMU testbed] swapping in") + print("qemu: swapping in") # Building bridges and taps shim_processes = [] r_queue = multiprocessing.Queue() e_queue = multiprocessing.Queue() - print(experiment.dif_ordering) for shim in experiment.dif_ordering: - command_list = [] if not isinstance(shim, mod.ShimEthDIF): # Nothing to do here continue self.shims.append(shim) + ipcps = shim.ipcps + command_list = [] command_list += ('sudo brctl addbr %(br)s\n' 'sudo ip link set %(br)s up' % {'br': shim.name} @@ -113,23 +150,39 @@ class Testbed(mod.Testbed): speed = '%dmbit' % shim.link_speed # Rate limit the traffic transmitted on the TAP interface - command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root ' - 'htb default 11\n' - 'sudo tc class add dev %(tap)s parent 1: classid ' - '1:1 htb rate 10gbit\n' - 'sudo tc class add dev %(tap)s parent 1:1 classid ' - '1:11 htb rate %(speed)s' - % {'tap': tap_id, 'speed': speed} - ).split('\n') - - vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id}) + command_list += ( + 'sudo tc qdisc add dev %(tap)s handle 1: root ' + 'htb default 11\n' + 'sudo tc class add dev %(tap)s parent 1: classid ' + '1:1 htb rate 10gbit\n' + 'sudo tc class add dev %(tap)s parent 1:1 classid ' + '1:11 htb rate %(speed)s' + % {'tap': tap_id, 'speed': speed} + ).split('\n') + + vm['ports'].append({'tap_id': tap_id, + 'shim': shim, + 'port_id': port_id}) + ipcp_set = [x for x in ipcps if x in node.ipcps] + if len(ipcp_set) > 1: + raise Exception("Error: more than one ipcp in common " + "between shim dif %s and node %s" + % (shim.name, node.name)) + ipcp = ipcp_set[0] # type: mod.ShimEthIPCP + assert ipcp.name == '%s.%s' % (shim.name, node.name), \ + 'Incorrect Shim Ipcp found: expected %s.%s, found %s' \ + % (shim.name, node.name, ipcp.name) + ipcp.ifname = tap_id # TODO deal with Ip address (shim UDP DIF). # Avoid stacking processes if one failed before. if not e_queue.empty(): break # Launch commands asynchronously - process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue)) + process = multiprocessing.Process(target=self._run_command_chain, + args=(command_list, + r_queue, + e_queue)) shim_processes.append(process) process.start() @@ -142,14 +195,15 @@ class Testbed(mod.Testbed): # Check for errors if not e_queue.empty(): error_str = str(e_queue.get()) - print('Testbed instantiation failed: %s' % error_str) + print('qemu: Testbed instantiation failed: %s' % error_str) raise Exception('Failure: %s' % error_str) try: # Check for results result = r_queue.get(timeout=1) if result == "Command chain ran correctly.": over_processes += 1 - print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes)) + print('qemu: %s of %s processes completed.' + % (over_processes, total_processes)) except: max_waiting_time -= 1 @@ -157,9 +211,9 @@ class Testbed(mod.Testbed): boot_batch_size = max(1, multiprocessing.cpu_count() // 2) booting_budget = boot_batch_size - boot_backoff = 12 # in seconds + boot_backoff = 12 # in seconds base_port = 2222 - vm_memory = 164 # in megabytes + vm_memory = 164 # in megabytes vm_frontend = 'virtio-net-pci' vmid = 1 @@ -167,6 +221,7 @@ class Testbed(mod.Testbed): for node in experiment.nodes: name = node.name vm = self.vms.setdefault(name, {'vm': node, 'ports': []}) + vm['id'] = vmid fwdp = base_port + vmid fwdc = fwdp + 10000 mac = '00:0a:0a:0a:%02x:%02x' % (vmid, 99) @@ -199,7 +254,6 @@ class Testbed(mod.Testbed): '-device %(frontend)s,mac=%(mac)s,netdev=mgmt ' '-netdev user,id=mgmt,%(hostfwdstr)s ' '-vga std ' - '-pidfile rina-%(id)s.pid ' '-serial file:%(vmname)s.log ' % vars_dict ) @@ -211,23 +265,29 @@ class Testbed(mod.Testbed): mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id']) port['mac'] = mac - command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s ' - '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,' - 'downscript=no%(vhost)s ' - % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'], - 'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''} - ) + command += ( + '-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s ' + '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,' + 'downscript=no%(vhost)s ' + % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'], + 'frontend': vm_frontend, + 'vhost': ',vhost=on' if self.vhost else ''} + ) booting_budget -= 1 if booting_budget <= 0: - print('Sleeping %s secs waiting for the VMs to boot' % boot_backoff) + + print('qemu: Sleeping %s secs waiting ' + 'for the VMs to boot' % boot_backoff) + time.sleep(boot_backoff) booting_budget = boot_batch_size - with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w') as out_file: - print('DEBUG: executing >> %s' % command) - self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file)) - pass + with open('%s/qemu_out_%s' % (self.qemu_logs_dir, vmid), 'w')\ + as out_file: + print('qemu: executing >> %s' % command) + self.boot_processes.append(subprocess.Popen(command.split(), + stdout=out_file)) vmid += 1 @@ -235,9 +295,14 @@ class Testbed(mod.Testbed): if booting_budget < boot_backoff: tsleep = boot_backoff * (boot_batch_size - booting_budget) / \ boot_batch_size - print('Sleeping %s secs waiting for the last VMs to boot' % tsleep) + print('qemu: Sleeping %s secs waiting for the last VMs to boot' % tsleep) time.sleep(tsleep) + # TODO: to be removed, we should loop in the ssh part + print('qemu: Sleeping 5 seconds, just to be on the safe side') + time.sleep(5) + + self.recover_if_names(experiment) def swap_out(self, experiment): """ @@ -267,9 +332,10 @@ class Testbed(mod.Testbed): 'sudo ip tuntap del mode tap name %(tap)s' % {'tap': tap, 'br': shim.name} ).split('\n') - process = multiprocessing.Process(target=self._run_command_chain, - args=(commands, results_queue, error_queue), - kwargs={'ignore_errors': True}) + process = multiprocessing.Process( + target=self._run_command_chain, + args=(commands, results_queue, error_queue), + kwargs={'ignore_errors': True}) port_processes.append(process) process.start() @@ -280,14 +346,16 @@ class Testbed(mod.Testbed): while max_waiting_time > 0 and over_processes < total_processes: # Check for errors if not error_queue.empty(): - print('Failure while shutting down: %s' % str(error_queue.get())) + print('qemu:Failure while shutting down: %s'\ + % str(error_queue.get())) over_processes += 1 try: # Check for results result = results_queue.get(timeout=1) if result == "Command chain ran correctly.": over_processes += 1 - print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes)) + print('qemu: %s of %s tear-down port processes completed.' + % (over_processes, total_processes)) except: max_waiting_time -= 1 @@ -302,7 +370,9 @@ class Testbed(mod.Testbed): % {'br': shim.name} ).split('\n') process = multiprocessing.Process(target=self._run_command_chain, - args=(commands, results_queue, error_queue), + args=(commands, + results_queue, + error_queue), kwargs={'ignore_errors': True}) shim_processes.append(process) process.start() @@ -314,13 +384,15 @@ class Testbed(mod.Testbed): while max_waiting_time > 0 and over_processes < total_processes: # Check for errors if not error_queue.empty(): - print('Failure while shutting down: %s' % str(error_queue.get())) + print('qemu: Failure while shutting down: %s' + % str(error_queue.get())) over_processes += 1 try: # Check for results result = results_queue.get(timeout=1) if result == "Command chain ran correctly.": over_processes += 1 - print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes)) + print('qemu: %s of %s tear-down shim processes completed.' + % (over_processes, total_processes)) except: max_waiting_time -= 1 |