From ed00e77fee22535169f151cac728a44d4cb6c28d Mon Sep 17 00:00:00 2001 From: Sander Vrijders Date: Wed, 15 Nov 2017 22:56:32 +0100 Subject: rumba: Use saner names and add Dimitri as author This uses a saner name for multiprocessing_utils and adds some licenses where they were missing and adds Dimitri as author. --- rumba/multiprocess.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 rumba/multiprocess.py (limited to 'rumba/multiprocess.py') diff --git a/rumba/multiprocess.py b/rumba/multiprocess.py new file mode 100644 index 0000000..d7ca18d --- /dev/null +++ b/rumba/multiprocess.py @@ -0,0 +1,132 @@ +# +# A library to manage ARCFIRE experiments +# +# Copyright (C) 2017 Nextworks S.r.l. +# Copyright (C) 2017 imec +# +# Sander Vrijders +# Dimitri Staessens +# Vincenzo Maffione +# Marco Capitani +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; either +# version 2.1 of the License, or (at your option) any later version. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., http://www.fsf.org/about/contact/. +# + +import multiprocessing.dummy as multiprocessing +import sys + +import rumba.log as log + +if sys.version_info[0] >= 3: + import contextlib +else: + import contextlib2 as contextlib + + +logger = log.get_logger(__name__) + + +def call_in_parallel(name_list, argument_list, executor_list): + """ + Calls each executor in executor_list with the corresponding + argument in argument_list + + Assumes that the three lists are the same length, will fail otherwise. + Is equivalent to + for i, e in enumerate(executor_list): + e(argument_list[i]) + but all calls will be executed in parallel. + + If successful, no output will be given. Otherwise, this will raise + the exception raised by one failed call at random. + + :param name_list: list of names of the executors (used for logging) + :param argument_list: list of arguments to the executors + :param executor_list: list of executors (as functions) + """ + if len(executor_list) != len(name_list) \ + or len(executor_list) != len(argument_list): + raise ValueError("Names, arguments and executors lists " + "must have the same length") + + def job(executor, name, m_queue, argument): + try: + # m_queue.cancel_join_thread() + logger.debug('Starting process "%s".' + % (name,)) + executor(argument) + m_queue.put("DONE") + except BaseException as e: + logger.error('Setup failed. %s: %s', + type(e).__name__, + str(e)) + m_queue.put(e) + + logger.debug('About to start spawning processes.') + queue = multiprocessing.Queue() + with contextlib.ExitStack() as stack: + # This is a composite context manager. + # After exiting the 'with' block, the __exit__ method of all + # processes that have been registered will be called. + msg_to_be_read = 0 + for i, e in enumerate(executor_list): + stack.enter_context(ProcessContextManager( + target=job, + args=(e, name_list[i], queue, argument_list[i]) + )) + msg_to_be_read += 1 + results = [] + for _ in range(len(executor_list)): + result = queue.get() # This blocks until results are available + msg_to_be_read -= 1 + results.append(result) + for result in results: + if result != "DONE": + raise result + # If we get here, we got a success for every node, hence we are done. + + +class ProcessContextManager(object): + + def __init__(self, target, args=None, kwargs=None): + if args is None: + args = () + if kwargs is None: + kwargs = {} + self.process = multiprocessing.Process( + target=target, + args=tuple(args), + kwargs=kwargs + ) + + def __enter__(self): + self.process.start() + return self.process + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_tb is not None or exc_val is not None or exc_tb is not None: + logger.error('Subprocess error: %s.' % (type(exc_val).__name__,)) + try: + self.process.terminate() + self.process.join() + except AttributeError: + # We are using multiprocessing.dummy, so no termination. + # We trust the threads will die with the application + # (since we are shutting down anyway) + pass + return False + else: + self.process.join() + return True -- cgit v1.2.3