aboutsummaryrefslogtreecommitdiff
path: root/rumba/testbeds/qemu.py
blob: 82ad873175ddf6db0bd1a7ced3d9985d5c5253b4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
#
# QEMU testbed for Rumba
#
#    Vincenzo Maffione  <v.maffione@nextworks.it>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA  02110-1301  USA
import getpass
import multiprocessing
import time
import subprocess
import os

import rumba.model as mod


class Testbed(mod.Testbed):
    def __init__(self, exp_name, username, bzimage, initramfs, proj_name="ARCFIRE", password="",
                 use_vhost=True, qemu_out_folder=""):
        mod.Testbed.__init__(self, exp_name, username, password, proj_name)
        self.vms = {}
        self.shims = []
        self.bzimage = bzimage
        self.initramfs = initramfs
        self.vhost = use_vhost
        self.qemu_folder = qemu_out_folder
        self.boot_processes = []

    @staticmethod
    def _run_command_chain(commands, results_queue, error_queue):
        """
        Runs (sequentially) the command list.

        On error, breaks and dumps it in error_queue, and interrupts as soon as it is non-empty.

        :type commands: list
        :type results_queue: Queue
        :type error_queue: Queue
        :param commands: list of commands to execute
        :param results_queue: Queue of results of parallel processes
        :param error_queue: Queue of error(s) encountered
        :return: None
        """
        try:
            for command in commands:
                if not error_queue.empty():
                    break
                print('DEBUG: executing >> {}'.format(command))
                subprocess.check_call(command.split())

            results_queue.put("Command chain ran correctly.")
        except subprocess.CalledProcessError as e:
            error_queue.put(str(e))

    def swap_in(self, experiment):
        """
        :type experiment mod.Experiment
        :param experiment: The experiment running
        """
        if os.geteuid() != 0:
            pw = getpass.getpass('[sudo] password for {}:'.format(getpass.getuser()))
            if '"' in pw or "'" in pw:
                print('Illegal password: contains " or \'')
                raise Exception('Not authenticated')
            else:
                try:
                    subprocess.check_call("sudo -v -p '{}'".format(pw).split())
                except subprocess.CalledProcessError:
                    raise Exception('Not authenticated')

        print("[QEMU testbed] swapping in")

        # Building bridges and taps
        shim_processes = []
        r_queue = multiprocessing.Queue()
        e_queue = multiprocessing.Queue()
        print(experiment.dif_ordering)
        for shim in experiment.dif_ordering:
            command_list = []
            if not isinstance(shim, mod.ShimEthDIF):
                # Nothing to do here
                continue
            self.shims.append(shim)
            command_list += ('sudo brctl addbr %(br)s\n'
                             'sudo ip link set %(br)s up'
                             % {'br': shim.name}
                             ).split('\n')
            for node in shim.members:  # type:mod.Node
                name = node.name
                vm = self.vms.setdefault(name, {'vm': node, 'ports': []})
                port_id = len(vm['ports']) + 1
                tap_id = '%s.%02x' % (name, port_id)

                command_list += ('sudo ip tuntap add mode tap name %(tap)s\n'
                                 'sudo ip link set %(tap)s up\n'
                                 'sudo brctl addif %(br)s %(tap)s'
                                 % {'tap': tap_id, 'br': shim.name}
                                 ).split('\n')

                if shim.link_speed > 0:
                    speed = '%dmbit' % shim.link_speed

                    # Rate limit the traffic transmitted on the TAP interface
                    command_list += ('sudo tc qdisc add dev %(tap)s handle 1: root '
                                     'htb default 11\n'
                                     'sudo tc class add dev %(tap)s parent 1: classid '
                                     '1:1 htb rate 10gbit\n'
                                     'sudo tc class add dev %(tap)s parent 1:1 classid '
                                     '1:11 htb rate %(speed)s'
                                     % {'tap': tap_id, 'speed': speed}
                                     ).split('\n')

                vm['ports'].append({'tap_id': tap_id, 'shim': shim, 'port_id': port_id})
                # TODO deal with Ip address (shim UDP DIF).

            # Avoid stacking processes if one failed before.
            if not e_queue.empty():
                break
            # Launch commands asynchronously
            process = multiprocessing.Process(target=self._run_command_chain, args=(command_list, r_queue, e_queue))
            shim_processes.append(process)
            process.start()

        # Wait for all processes to be over.
        total_processes = len(shim_processes)
        max_waiting_time = 2 * total_processes
        over_processes = 0

        while max_waiting_time > 0 and over_processes < total_processes:
            # Check for errors
            if not e_queue.empty():
                error_str = str(e_queue.get())
                print('Testbed instantiation failed: {}'.format(error_str))
                raise Exception('Failure: {}'.format(error_str))
            try:
                # Check for results
                result = r_queue.get(timeout=1)
                if result == "Command chain ran correctly.":
                    over_processes += 1
                    print('DEBUG: %s of %s processes completed.' % (over_processes, total_processes))
            except:
                max_waiting_time -= 1

        # Building vms

        boot_batch_size = max(1, multiprocessing.cpu_count() // 2)
        booting_budget = boot_batch_size
        boot_backoff = 12
        base_port = 2222
        vm_memory = 164
        vm_frontend = 'virtio-net-pci'

        vmid = 1

        for node in experiment.nodes:
            name = node.full_name
            vm = self.vms.setdefault(name, {'vm': node, 'ports': []})
            fwdp = base_port + vmid
            fwdc = fwdp + 10000
            mac = '00:0a:0a:0a:%02x:%02x' % (vmid, 99)
            vm['ssh'] = fwdp
            vm['id'] = vmid

            vars_dict = {'fwdp': fwdp, 'id': vmid, 'mac': mac,
                         'bzimage': self.bzimage,
                         'initramfs': self.initramfs,
                         'fwdc': fwdc,
                         'memory': vm_memory, 'frontend': vm_frontend,
                         'vmname': name}

            host_fwd_str = 'hostfwd=tcp::%(fwdp)s-:22' % vars_dict
            vars_dict['hostfwdstr'] = host_fwd_str

            command = 'qemu-system-x86_64 '
            # TODO manage non default images
            command += ('-kernel %(bzimage)s '
                        '-append "console=ttyS0" '
                        '-initrd %(initramfs)s '
                        % vars_dict)
            command += ('-nographic '
                        '-display none '
                        '--enable-kvm '
                        '-smp 1 '
                        '-m %(memory)sM '
                        '-device %(frontend)s,mac=%(mac)s,netdev=mgmt '
                        '-netdev user,id=mgmt,%(hostfwdstr)s '
                        '-vga std '
                        '-pidfile rina-%(id)s.pid '
                        '-serial file:%(vmname)s.log '
                        % vars_dict
                        )

            del vars_dict

            for port in vm['ports']:
                tap_id = port['tap_id']
                mac = '00:0a:0a:0a:%02x:%02x' % (vmid, port['port_id'])
                port['mac'] = mac

                command += ('-device %(frontend)s,mac=%(mac)s,netdev=data%(idx)s '
                            '-netdev tap,ifname=%(tap)s,id=data%(idx)s,script=no,'
                            'downscript=no%(vhost)s '
                            % {'mac': mac, 'tap': tap_id, 'idx': port['port_id'],
                               'frontend': vm_frontend, 'vhost': ',vhost=on' if self.vhost else ''}
                            )

            booting_budget -= 1
            if booting_budget <= 0:
                print('Sleeping for {} seconds to give the machines time to boot up.'.format(boot_backoff))
                time.sleep(boot_backoff)
                booting_budget = boot_batch_size

            with open(self.qemu_folder + '/qemu_out{}'.format(vmid), 'w') as out_file:
                print('DEBUG: executing >> {}'.format(command))
                self.boot_processes.append(subprocess.Popen(command.split(), stdout=out_file))
                pass

            vmid += 1

    def swap_out(self, experiment):
        """
        :rtype str
        :return: The script to tear down the experiment
        """
        # TERM qemu processes
        for process in self.boot_processes:
            process.terminate()

        # Wait for them to shut down
        for process in self.boot_processes:
            process.wait()

        port_processes = []
        error_queue = multiprocessing.Queue()
        results_queue = multiprocessing.Queue()
        for vm_name, vm in self.vms.items():
            for port in vm['ports']:
                tap = port['tap_id']
                shim = port['shim']

                commands = []

                commands += ('sudo brctl delif %(br)s %(tap)s\n'
                             'sudo ip link set %(tap)s down\n'
                             'sudo ip tuntap del mode tap name %(tap)s'
                             % {'tap': tap, 'br': shim.name}
                             ).split('\n')
                process = multiprocessing.Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
                port_processes.append(process)
                process.start()

        total_processes = len(port_processes)
        max_waiting_time = 2 * total_processes
        over_processes = 0

        while max_waiting_time > 0 and over_processes < total_processes:
            # Check for errors
            if not error_queue.empty():
                print('Failure while shutting down: {}'.format(str(error_queue.get())))
                over_processes += 1
            try:
                # Check for results
                result = results_queue.get(timeout=1)
                if result == "Command chain ran correctly.":
                    over_processes += 1
                    print('DEBUG: %s of %s tear-down port processes completed.' % (over_processes, total_processes))
            except:
                max_waiting_time -= 1

        error_queue = multiprocessing.Queue()
        results_queue = multiprocessing.Queue()
        shim_processes = []

        for shim in self.shims:
            commands = []
            commands += ('sudo ip link set %(br)s down\n'
                         'sudo brctl delbr %(br)s'
                         % {'br': shim.name}
                         ).split('\n')
            process = multiprocessing.Process(target=self._run_command_chain, args=(commands, results_queue, error_queue))
            shim_processes.append(process)
            process.start()

        total_processes = len(shim_processes)
        max_waiting_time = 2 * total_processes
        over_processes = 0

        while max_waiting_time > 0 and over_processes < total_processes:
            # Check for errors
            if not error_queue.empty():
                print('Failure while shutting down: {}'.format(str(error_queue.get())))
                over_processes += 1
            try:
                # Check for results
                result = results_queue.get(timeout=1)
                if result == "Command chain ran correctly.":
                    over_processes += 1
                    print('DEBUG: %s of %s tear-down shim processes completed.' % (over_processes, total_processes))
            except:
                max_waiting_time -= 1