diff options
author | Marco Capitani <m.capitani@nextworks.it> | 2017-11-07 11:40:54 +0100 |
---|---|---|
committer | Sander Vrijders <sander.vrijders@ugent.be> | 2017-11-15 18:59:11 +0100 |
commit | 2da15caf24a8b2da70d755e065a5dc3d770c6454 (patch) | |
tree | 292456db4acc4ab8f4afffad50113cc7e219b6cd /rumba/log.py | |
parent | 2e91ca33f90f7c74887013e08c95bb00cdd4fc00 (diff) | |
download | rumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.tar.gz rumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.zip |
prototypes: Add parallel installation for IRATI
This adds parallel installation for IRATI, it also adds support for
multithread/multiprocess logging. Furthermore prototype-agnostic
utilities for multiprocessing have been added. Caching of clients has
been re-enabled for the ssh connections.
Diffstat (limited to 'rumba/log.py')
-rw-r--r-- | rumba/log.py | 142 |
1 files changed, 135 insertions, 7 deletions
diff --git a/rumba/log.py b/rumba/log.py index c7afbd4..6eb2137 100644 --- a/rumba/log.py +++ b/rumba/log.py @@ -24,10 +24,9 @@ # import logging - -import sys - +import logging.handlers import multiprocessing +import sys DEBUG = logging.DEBUG @@ -40,6 +39,130 @@ CRITICAL = logging.CRITICAL loggers_set = set() +mq = multiprocessing.Queue() + + +try: + from logging.handlers import QueueHandler +except ImportError: + # We are in python2 code + class QueueHandler(logging.Handler): + """ + This handler sends events to a queue. Typically, it would be used + together with a multiprocessing Queue to centralise logging to file + in one process (in a multi-process application), so as to avoid file + write contention between processes. + + This code is new in Python 3.2, but this class can be copy pasted into + user code for use with earlier Python versions. + """ + + # Copy-pasted as per above docstring from logging + + def __init__(self, queue): + logging.Handler.__init__(self) + self.queue = queue + + def enqueue(self, record): + self.queue.put_nowait(record) + + def prepare(self, record): + self.format(record) + record.msg = record.message + record.args = None + record.exc_info = None + return record + + def emit(self, record): + try: + self.enqueue(self.prepare(record)) + except Exception: + self.handleError(record) + +try: + from logging.handlers import QueueListener +except ImportError: + # We are in python2 code + import threading + try: + import Queue + except ImportError: + # Make it pythonX with 3.0 <= X <3.2 + import queue as Queue + + class QueueListener(object): + """ + This class implements an internal threaded listener which watches for + LogRecords being added to a queue, removes them and passes them to a + list of handlers for processing. + """ + + # Also copy-pasted + _sentinel = None + + def __init__(self, queue, respect_handler_level=False, *handlers): + self.queue = queue + self.handlers = handlers + self._stop = threading.Event() + self._thread = None + self.respect_handler_level = respect_handler_level + + def dequeue(self, block): + return self.queue.get(block) + + def start(self): + self._thread = t = threading.Thread(target=self._monitor) + t.setDaemon(True) + t.start() + + def prepare(self , record): + return record + + def handle(self, record): + record = self.prepare(record) + for handler in self.handlers: + if not self.respect_handler_level: + process = True + else: + process = record.levelno >= handler.level + if process: + handler.handle(record) + + def _monitor(self): + q = self.queue + has_task_done = hasattr(q, 'task_done') + while not self._stop.isSet(): + try: + record = self.dequeue(True) + if record is self._sentinel: + break + self.handle(record) + if has_task_done: + q.task_done() + except Queue.Empty: + pass + # There might still be records in the queue. + while True: + try: + record = self.dequeue(False) + if record is self._sentinel: + break + self.handle(record) + if has_task_done: + q.task_done() + except Queue.Empty: + break + + def enqueue_sentinel(self): + self.queue.put_nowait(self._sentinel) + + def stop(self): + self._stop.set() + self.enqueue_sentinel() + self._thread.join() + self._thread = None + + class RumbaFormatter(logging.Formatter): """The logging.Formatter subclass used by Rumba""" @@ -65,14 +188,19 @@ class RumbaFormatter(logging.Formatter): def setup(): """Configures the logging framework with default values.""" + global mq + queue_handler = QueueHandler(mq) + queue_handler.setLevel(logging.DEBUG) + logging.basicConfig(handlers=[queue_handler], level=logging.DEBUG) + logging.getLogger('').setLevel(logging.ERROR) + logging.getLogger('rumba').setLevel(logging.INFO) + handler = logging.StreamHandler(sys.stdout) - handler.lock = multiprocessing.RLock() handler.setLevel(logging.DEBUG) formatter = RumbaFormatter() handler.setFormatter(formatter) - logging.basicConfig(handlers=[handler], level=logging.DEBUG) - logging.getLogger('').setLevel(logging.ERROR) - logging.getLogger('rumba').setLevel(logging.INFO) + listener = QueueListener(mq, handler) + listener.start() # Used for the first call, in order to configure logging |