From ed00e77fee22535169f151cac728a44d4cb6c28d Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 15 Nov 2017 22:56:32 +0100 Subject: rumba: Use saner names and add Dimitri as author This uses a saner name for multiprocessing_utils and adds some licenses where they were missing and adds Dimitri as author. --- AUTHORS.txt | 3 +- rumba/__init__.py | 1 + rumba/log.py | 1 + rumba/model.py | 1 + rumba/multiprocess.py | 132 ++++++++++++++++++++++++++++++++++++ rumba/multiprocessing_utils.py | 106 ----------------------------- rumba/prototypes/__init__.py | 1 + rumba/prototypes/enroll.py | 1 + rumba/prototypes/irati.py | 3 +- rumba/prototypes/irati_templates.py | 1 + rumba/prototypes/ouroboros.py | 3 +- rumba/prototypes/rlite.py | 1 + rumba/recpoisson.py | 1 + rumba/ssh_support.py | 1 + rumba/testbeds/__init__.py | 1 + rumba/testbeds/emulab.py | 1 + rumba/testbeds/faketestbed.py | 1 + rumba/testbeds/jfed.py | 1 + rumba/testbeds/qemu.py | 5 +- rumba/utils.py | 26 +++++++ 20 files changed, 180 insertions(+), 111 deletions(-) create mode 100644 rumba/multiprocess.py delete mode 100644 rumba/multiprocessing_utils.py diff --git a/AUTHORS.txt b/AUTHORS.txt index f3d3337..55a1c91 100644 --- a/AUTHORS.txt +++ b/AUTHORS.txt @@ -1,3 +1,4 @@ -Vincenzo Maffione Sander Vrijders +Dimitri Staessens +Vincenzo Maffione Marco Capitani diff --git a/rumba/__init__.py b/rumba/__init__.py index 16b18f5..6f6b5b2 100644 --- a/rumba/__init__.py +++ b/rumba/__init__.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/log.py b/rumba/log.py index 6eb2137..eb6be03 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/model.py b/rumba/model.py index 6ba93b0..da63e76 100644 --- a/rumba/model.py +++ b/rumba/model.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/multiprocess.py b/rumba/multiprocess.py new file mode 100644 index 0000000..d7ca18d --- /dev/null +++ b/rumba/multiprocess.py @@ -0,0 +1,132 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders +# Dimitri Staessens +# Vincenzo Maffione +# Marco Capitani +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +import multiprocessing.dummy as multiprocessing +import sys + +import rumba.log as log + +if sys.version_info[0] >= 3: + import contextlib +else: + import contextlib2 as contextlib + + +logger = log.get_logger(__name__) + + +def call_in_parallel(name_list, argument_list, executor_list): + """ + Calls each executor in executor_list with the corresponding + argument in argument_list + + Assumes that the three lists are the same length, will fail otherwise. + Is equivalent to + for i, e in enumerate(executor_list): + e(argument_list[i]) + but all calls will be executed in parallel. + + If successful, no output will be given. Otherwise, this will raise + the exception raised by one failed call at random. + + :param name_list: list of names of the executors (used for logging) + :param argument_list: list of arguments to the executors + :param executor_list: list of executors (as functions) + """ + if len(executor_list) != len(name_list) \ + or len(executor_list) != len(argument_list): + raise ValueError("Names, arguments and executors lists " + "must have the same length") + + def job(executor, name, m_queue, argument): + try: + # m_queue.cancel_join_thread() + logger.debug('Starting process "%s".' + % (name,)) + executor(argument) + m_queue.put("DONE") + except BaseException as e: + logger.error('Setup failed. %s: %s', + type(e).__name__, + str(e)) + m_queue.put(e) + + logger.debug('About to start spawning processes.') + queue = multiprocessing.Queue() + with contextlib.ExitStack() as stack: + # This is a composite context manager. + # After exiting the 'with' block, the __exit__ method of all + # processes that have been registered will be called. + msg_to_be_read = 0 + for i, e in enumerate(executor_list): + stack.enter_context(ProcessContextManager( + target=job, + args=(e, name_list[i], queue, argument_list[i]) + )) + msg_to_be_read += 1 + results = [] + for _ in range(len(executor_list)): + result = queue.get() # This blocks until results are available + msg_to_be_read -= 1 + results.append(result) + for result in results: + if result != "DONE": + raise result + # If we get here, we got a success for every node, hence we are done. + + +class ProcessContextManager(object): + + def __init__(self, target, args=None, kwargs=None): + if args is None: + args = () + if kwargs is None: + kwargs = {} + self.process = multiprocessing.Process( + target=target, + args=tuple(args), + kwargs=kwargs + ) + + def __enter__(self): + self.process.start() + return self.process + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_tb is not None or exc_val is not None or exc_tb is not None: + logger.error('Subprocess error: %s.' % (type(exc_val).__name__,)) + try: + self.process.terminate() + self.process.join() + except AttributeError: + # We are using multiprocessing.dummy, so no termination. + # We trust the threads will die with the application + # (since we are shutting down anyway) + pass + return False + else: + self.process.join() + return True diff --git a/rumba/multiprocessing_utils.py b/rumba/multiprocessing_utils.py deleted file mode 100644 index ce5dd5c..0000000 --- a/rumba/multiprocessing_utils.py +++ /dev/null @@ -1,106 +0,0 @@ -import multiprocessing.dummy as multiprocessing -import sys - -import rumba.log as log - -if sys.version_info[0] >= 3: - import contextlib -else: - import contextlib2 as contextlib - - -logger = log.get_logger(__name__) - - -def call_in_parallel(name_list, argument_list, executor_list): - """ - Calls each executor in executor_list with the corresponding - argument in argument_list - - Assumes that the three lists are the same length, will fail otherwise. - Is equivalent to - for i, e in enumerate(executor_list): - e(argument_list[i]) - but all calls will be executed in parallel. - - If successful, no output will be given. Otherwise, this will raise - the exception raised by one failed call at random. - - :param name_list: list of names of the executors (used for logging) - :param argument_list: list of arguments to the executors - :param executor_list: list of executors (as functions) - """ - if len(executor_list) != len(name_list) \ - or len(executor_list) != len(argument_list): - raise ValueError("Names, arguments and executors lists " - "must have the same length") - - def job(executor, name, m_queue, argument): - try: - # m_queue.cancel_join_thread() - logger.debug('Starting process "%s".' - % (name,)) - executor(argument) - m_queue.put("DONE") - except BaseException as e: - logger.error('Setup failed. %s: %s', - type(e).__name__, - str(e)) - m_queue.put(e) - - logger.debug('About to start spawning processes.') - queue = multiprocessing.Queue() - with contextlib.ExitStack() as stack: - # This is a composite context manager. - # After exiting the 'with' block, the __exit__ method of all - # processes that have been registered will be called. - msg_to_be_read = 0 - for i, e in enumerate(executor_list): - stack.enter_context(ProcessContextManager( - target=job, - args=(e, name_list[i], queue, argument_list[i]) - )) - msg_to_be_read += 1 - results = [] - for _ in range(len(executor_list)): - result = queue.get() # This blocks until results are available - msg_to_be_read -= 1 - results.append(result) - for result in results: - if result != "DONE": - raise result - # If we get here, we got a success for every node, hence we are done. - - -class ProcessContextManager(object): - - def __init__(self, target, args=None, kwargs=None): - if args is None: - args = () - if kwargs is None: - kwargs = {} - self.process = multiprocessing.Process( - target=target, - args=tuple(args), - kwargs=kwargs - ) - - def __enter__(self): - self.process.start() - return self.process - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_tb is not None or exc_val is not None or exc_tb is not None: - logger.error('Subprocess error: %s.' % (type(exc_val).__name__,)) - try: - self.process.terminate() - self.process.join() - except AttributeError: - # We are using multiprocessing.dummy, so no termination. - # We trust the threads will die with the application - # (since we are shutting down anyway) - pass - return False - else: - self.process.join() - return True diff --git a/rumba/prototypes/__init__.py b/rumba/prototypes/__init__.py index 16b18f5..6f6b5b2 100644 --- a/rumba/prototypes/__init__.py +++ b/rumba/prototypes/__init__.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/prototypes/enroll.py b/rumba/prototypes/enroll.py index 35cc136..27b5ebe 100755 --- a/rumba/prototypes/enroll.py +++ b/rumba/prototypes/enroll.py @@ -6,6 +6,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/prototypes/irati.py b/rumba/prototypes/irati.py index 160bff1..c9faf36 100644 --- a/rumba/prototypes/irati.py +++ b/rumba/prototypes/irati.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # @@ -30,7 +31,7 @@ import time import rumba.ssh_support as ssh import rumba.model as mod -import rumba.multiprocessing_utils as m_processing +import rumba.multiprocess as m_processing import rumba.prototypes.irati_templates as irati_templates import rumba.log as log diff --git a/rumba/prototypes/irati_templates.py b/rumba/prototypes/irati_templates.py index c94de92..877ac21 100644 --- a/rumba/prototypes/irati_templates.py +++ b/rumba/prototypes/irati_templates.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/prototypes/ouroboros.py b/rumba/prototypes/ouroboros.py index a09f570..050ec9c 100644 --- a/rumba/prototypes/ouroboros.py +++ b/rumba/prototypes/ouroboros.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # @@ -27,7 +28,7 @@ import time import rumba.ssh_support as ssh import rumba.model as mod -import rumba.multiprocessing_utils as m_processing +import rumba.multiprocess as m_processing import rumba.log as log diff --git a/rumba/prototypes/rlite.py b/rumba/prototypes/rlite.py index 36e2d47..b66bd69 100644 --- a/rumba/prototypes/rlite.py +++ b/rumba/prototypes/rlite.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/recpoisson.py b/rumba/recpoisson.py index d594811..67dfab6 100644 --- a/rumba/recpoisson.py +++ b/rumba/recpoisson.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/ssh_support.py b/rumba/ssh_support.py index 2bc761b..9990fc9 100644 --- a/rumba/ssh_support.py +++ b/rumba/ssh_support.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/testbeds/__init__.py b/rumba/testbeds/__init__.py index 16b18f5..6f6b5b2 100644 --- a/rumba/testbeds/__init__.py +++ b/rumba/testbeds/__init__.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/testbeds/emulab.py b/rumba/testbeds/emulab.py index 7568815..3ba625d 100644 --- a/rumba/testbeds/emulab.py +++ b/rumba/testbeds/emulab.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/testbeds/faketestbed.py b/rumba/testbeds/faketestbed.py index 6512850..44994e3 100644 --- a/rumba/testbeds/faketestbed.py +++ b/rumba/testbeds/faketestbed.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/testbeds/jfed.py b/rumba/testbeds/jfed.py index d6eb458..ac54042 100644 --- a/rumba/testbeds/jfed.py +++ b/rumba/testbeds/jfed.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # diff --git a/rumba/testbeds/qemu.py b/rumba/testbeds/qemu.py index 75a564c..530b4ac 100644 --- a/rumba/testbeds/qemu.py +++ b/rumba/testbeds/qemu.py @@ -5,6 +5,7 @@ # Copyright (C) 2017 imec # # Sander Vrijders +# Dimitri Staessens # Vincenzo Maffione # Marco Capitani # @@ -32,8 +33,8 @@ import time import rumba.model as mod import rumba.log as log import rumba.ssh_support as ssh_support -import rumba.multiprocessing_utils as m_processing -from rumba.multiprocessing_utils import ProcessContextManager +import rumba.multiprocess as m_processing +from rumba.multiprocess import ProcessContextManager if sys.version_info[0] >= 3: from urllib.request import urlretrieve diff --git a/rumba/utils.py b/rumba/utils.py index 2fc623c..97e3646 100644 --- a/rumba/utils.py +++ b/rumba/utils.py @@ -1,3 +1,29 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders +# Dimitri Staessens +# Vincenzo Maffione +# Marco Capitani +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + import time import rumba.log as log -- cgit v1.2.3