10

I have a Python multiprocessing application to which I would like to add some logging functionality. The Python logging cookbook recommends using a Queue. Every process will put log records into it via the QueueHandler and a Listener Process will handle the records via a predefined Handler.

Here is the example provided by the Python logging cookbook:

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

My question: The example from the Python logging cookbook implies that the Queue has to be passed to every function that will be executed in multiprocessing mode. This sure works if you have a small application, but it gets ugly if you have a bigger programm. Is there a way to use something like a Singleton Queue that is created once via logging.config.dictConfig and then shared by all processes without having to pass it to every function?

2
  • The multiprocessing module is very flexible. How do you call your multiprocessing function? Do you use the pool, your own process? Can you add a minimal code example? Commented Mar 24, 2020 at 12:26
  • 1
    I added the example form the Python logging cookbook Commented Mar 24, 2020 at 12:49

1 Answer 1

12

In your case a few simple classes will do the trick.

Have a look and let me know if you need some further explanations or want something different.

import logging
import logging.handlers
import multiprocessing
import multiprocessing.pool

from random import choice, random
import time


class ProcessLogger(multiprocessing.Process):
    _global_process_logger = None

    def __init__(self):
        super().__init__()
        self.queue = multiprocessing.Queue(-1)

    @classmethod
    def get_global_logger(cls):
        if cls._global_process_logger is not None:
            return cls._global_process_logger
        raise Exception("No global process logger exists.")

    @classmethod
    def create_global_logger(cls):
        cls._global_process_logger = ProcessLogger()
        return cls._global_process_logger

    @staticmethod
    def configure():
        root = logging.getLogger()
        h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 1024**2, 10)
        f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
        h.setFormatter(f)
        root.addHandler(h)

    def stop(self):
        self.queue.put_nowait(None)

    def run(self):
        self.configure()
        while True:
            try:
                record = self.queue.get()
                if record is None:
                    break
                logger = logging.getLogger(record.name)
                logger.handle(record)
            except Exception:
                import sys, traceback
                print('Whoops! Problem:', file=sys.stderr)
                traceback.print_exc(file=sys.stderr)

    def new_process(self, target, args=[], kwargs={}):
        return ProcessWithLogging(self, target, args, kwargs)


def configure_new_process(log_process_queue):
    h = logging.handlers.QueueHandler(log_process_queue)
    root = logging.getLogger()
    root.addHandler(h)
    root.setLevel(logging.DEBUG)


class ProcessWithLogging(multiprocessing.Process):
    def __init__(self, target, args=[], kwargs={}, log_process=None):
        super().__init__()
        self.target = target
        self.args = args
        self.kwargs = kwargs
        if log_process is None:
            log_process = ProcessLogger.get_global_logger()
        self.log_process_queue = log_process.queue

    def run(self):
        configure_new_process(self.log_process_queue)
        self.target(*self.args, **self.kwargs)


class PoolWithLogging(multiprocessing.pool.Pool):
    def __init__(self, processes=None, context=None, log_process=None):
        if log_process is None:
            log_process = ProcessLogger.get_global_logger()
        super().__init__(processes=processes, initializer=configure_new_process,
                         initargs=(log_process.queue,), context=context)


LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]
LOGGERS = ['a.b.c', 'd.e.f']
MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]


def worker_process(param=None):
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: {}, param: {}'.format(name, param))
    return param


def main():
    process_logger = ProcessLogger.create_global_logger()
    process_logger.start()

    workers = []
    for i in range(10):
        worker = ProcessWithLogging(worker_process)
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()

    with PoolWithLogging(processes=4) as pool:
        print(pool.map(worker_process, range(8)))


    process_logger.stop()
    process_logger.join()


if __name__ == '__main__':
    main()
Sign up to request clarification or add additional context in comments.

10 Comments

Thanks a lot! This looks great! Do you have any suggestions for the scenario of starting the worker processes via the pool?
See my edited answer. Let me know, if you need anything else.
This is great! Thank you!! Is there a way to make it work under Windows as well? Right now, there is a pickle problem on Windows: TypeError: can't pickle weakref objects
Thanks for letting me know, Annamaria. I have edited my answer, it should now run on windows. Let me know if it works for you.
Thank you!! This is perfect!
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.