aboutsummaryrefslogtreecommitdiff
path: root/rumba/log.py
diff options
context:
space:
mode:
authorMarco Capitani <m.capitani@nextworks.it>2017-11-07 11:40:54 +0100
committerSander Vrijders <sander.vrijders@ugent.be>2017-11-15 18:59:11 +0100
commit2da15caf24a8b2da70d755e065a5dc3d770c6454 (patch)
tree292456db4acc4ab8f4afffad50113cc7e219b6cd /rumba/log.py
parent2e91ca33f90f7c74887013e08c95bb00cdd4fc00 (diff)
downloadrumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.tar.gz
rumba-2da15caf24a8b2da70d755e065a5dc3d770c6454.zip
prototypes: Add parallel installation for IRATI
This adds parallel installation for IRATI, it also adds support for multithread/multiprocess logging. Furthermore prototype-agnostic utilities for multiprocessing have been added. Caching of clients has been re-enabled for the ssh connections.
Diffstat (limited to 'rumba/log.py')
-rw-r--r--rumba/log.py142
1 files changed, 135 insertions, 7 deletions
diff --git a/rumba/log.py b/rumba/log.py
index c7afbd4..6eb2137 100644
--- a/rumba/log.py
+++ b/rumba/log.py
@@ -24,10 +24,9 @@
#
import logging
-
-import sys
-
+import logging.handlers
import multiprocessing
+import sys
DEBUG = logging.DEBUG
@@ -40,6 +39,130 @@ CRITICAL = logging.CRITICAL
loggers_set = set()
+mq = multiprocessing.Queue()
+
+
+try:
+ from logging.handlers import QueueHandler
+except ImportError:
+ # We are in python2 code
+ class QueueHandler(logging.Handler):
+ """
+ This handler sends events to a queue. Typically, it would be used
+ together with a multiprocessing Queue to centralise logging to file
+ in one process (in a multi-process application), so as to avoid file
+ write contention between processes.
+
+ This code is new in Python 3.2, but this class can be copy pasted into
+ user code for use with earlier Python versions.
+ """
+
+ # Copy-pasted as per above docstring from logging
+
+ def __init__(self, queue):
+ logging.Handler.__init__(self)
+ self.queue = queue
+
+ def enqueue(self, record):
+ self.queue.put_nowait(record)
+
+ def prepare(self, record):
+ self.format(record)
+ record.msg = record.message
+ record.args = None
+ record.exc_info = None
+ return record
+
+ def emit(self, record):
+ try:
+ self.enqueue(self.prepare(record))
+ except Exception:
+ self.handleError(record)
+
+try:
+ from logging.handlers import QueueListener
+except ImportError:
+ # We are in python2 code
+ import threading
+ try:
+ import Queue
+ except ImportError:
+ # Make it pythonX with 3.0 <= X <3.2
+ import queue as Queue
+
+ class QueueListener(object):
+ """
+ This class implements an internal threaded listener which watches for
+ LogRecords being added to a queue, removes them and passes them to a
+ list of handlers for processing.
+ """
+
+ # Also copy-pasted
+ _sentinel = None
+
+ def __init__(self, queue, respect_handler_level=False, *handlers):
+ self.queue = queue
+ self.handlers = handlers
+ self._stop = threading.Event()
+ self._thread = None
+ self.respect_handler_level = respect_handler_level
+
+ def dequeue(self, block):
+ return self.queue.get(block)
+
+ def start(self):
+ self._thread = t = threading.Thread(target=self._monitor)
+ t.setDaemon(True)
+ t.start()
+
+ def prepare(self , record):
+ return record
+
+ def handle(self, record):
+ record = self.prepare(record)
+ for handler in self.handlers:
+ if not self.respect_handler_level:
+ process = True
+ else:
+ process = record.levelno >= handler.level
+ if process:
+ handler.handle(record)
+
+ def _monitor(self):
+ q = self.queue
+ has_task_done = hasattr(q, 'task_done')
+ while not self._stop.isSet():
+ try:
+ record = self.dequeue(True)
+ if record is self._sentinel:
+ break
+ self.handle(record)
+ if has_task_done:
+ q.task_done()
+ except Queue.Empty:
+ pass
+ # There might still be records in the queue.
+ while True:
+ try:
+ record = self.dequeue(False)
+ if record is self._sentinel:
+ break
+ self.handle(record)
+ if has_task_done:
+ q.task_done()
+ except Queue.Empty:
+ break
+
+ def enqueue_sentinel(self):
+ self.queue.put_nowait(self._sentinel)
+
+ def stop(self):
+ self._stop.set()
+ self.enqueue_sentinel()
+ self._thread.join()
+ self._thread = None
+
+
class RumbaFormatter(logging.Formatter):
"""The logging.Formatter subclass used by Rumba"""
@@ -65,14 +188,19 @@ class RumbaFormatter(logging.Formatter):
def setup():
"""Configures the logging framework with default values."""
+ global mq
+ queue_handler = QueueHandler(mq)
+ queue_handler.setLevel(logging.DEBUG)
+ logging.basicConfig(handlers=[queue_handler], level=logging.DEBUG)
+ logging.getLogger('').setLevel(logging.ERROR)
+ logging.getLogger('rumba').setLevel(logging.INFO)
+
handler = logging.StreamHandler(sys.stdout)
- handler.lock = multiprocessing.RLock()
handler.setLevel(logging.DEBUG)
formatter = RumbaFormatter()
handler.setFormatter(formatter)
- logging.basicConfig(handlers=[handler], level=logging.DEBUG)
- logging.getLogger('').setLevel(logging.ERROR)
- logging.getLogger('rumba').setLevel(logging.INFO)
+ listener = QueueListener(mq, handler)
+ listener.start()
# Used for the first call, in order to configure logging