Logging with multiprocessing madness

2019-04-30 09:53发布

Im trying to use python's default logging module in a multiprocessing scenario. I've read:

  1. Python MultiProcess, Logging, Various Classes
  2. Logging using multiprocessing

and other multiple posts about multiprocessing, logging, python classes and such. After all this reading I've came to this piece of code I cannot make it properly run which uses python's logutils QueueHandler:

import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time

from logutils.queue import QueueListener, QueueHandler


class Worker(Process):

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q

        self.qh = QueueHandler(self.queue)
        self.root = logging.getLogger()
        self.root.addHandler(self.qh)
        self.root.setLevel(logging.DEBUG)        
        self.logger = logging.getLogger("W%i"%self.n)


    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)


def listener_process(queue):
    while True:
        try:
            record = queue.get()
            if record is None:
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            import sys, traceback
            print >> sys.stderr, 'Whoops! Problem:'
            traceback.print_exc(file=sys.stderr)

if __name__ == "__main__":

    mpq = mpQueue(-1)

    root = logging.getLogger()
    h = logging.StreamHandler()    
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s     %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

    l = logging.getLogger("Test")
    l.setLevel(logging.DEBUG)

    listener = Process(target=listener_process,
                       args=(mpq,))
    listener.start()
    workers=[]
    for i in xrange(1):
        worker = Worker(i, mpq)
        worker.daemon = True
        worker.start()
        workers.append(worker)

    for worker in workers:
        worker.join()

    mpq.put_nowait(None)
    listener.join()

    for i in xrange(10):
        l.info("testing %i"%i)

    print "Finish"

If the code is executed, the output somehow repeats lines like:

2013-12-02 16:44:46,002 Worker-2   W0 INFO         Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 0
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 1
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 2
2013-12-02 16:44:46,002 Worker-2   W0 INFO         Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 3
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 0
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 1
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 4
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 2
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 3
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 5
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 4
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 6
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 5
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 7
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 6
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 8
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 7
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 9
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 8
2013-12-02 16:44:46,004 Worker-2   W0 INFO         Completed 0
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 9
2013-12-02 16:44:46,004 Worker-2   W0 INFO         Completed 0
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 0
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 1
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 2
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 3
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 4
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 5
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 6
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 7
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 8
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 9
Finish

In other questios it's suggested that the handler gets added more than once, but, as you can see, I only add the streamhanlder once in the main method. I've already tested embedding the main method into a class with the same result.

EDIT: as @max suggested (or what I believe he said) I've modified the code of the worker class as:

class Worker(Process):

    root = logging.getLogger()
    qh = None

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q

        if not self.qh:
            Worker.qh = QueueHandler(self.queue)            
            Worker.root.addHandler(self.qh)
            Worker.root.setLevel(logging.DEBUG)

        self.logger = logging.getLogger("W%i"%self.n)

        print self.root.handlers

    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)

With the same results, Now the queue handler is not added again and again but still there are duplicate log entries, even with just one worker.

EDIT2: I've changed the code a little bit. I changed the listener process and now use a QueueListener (that's what I intended in the begining anyway), moved the main code to a class.

import sys

import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time

from logutils.queue import QueueListener, QueueHandler

root = logging.getLogger()
added_qh = False

class Worker(Process):

    def __init__(self, logconf, n, qh):
        super(Worker, self).__init__()
        self.n = n
        self.logconf = logconf

#        global root
        global added_qh

        if not added_qh:
            added_qh = True
            root.addHandler(qh)
            root.setLevel(logging.DEBUG)            

        self.logger = logging.getLogger("W%i"%self.n)

        #print root.handlers

    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)


class Main(object):

    def __init__(self):
        pass

    def start(self):

        mpq = mpQueue(-1)
        qh = QueueHandler(mpq)

        h = logging.StreamHandler()

        ql = QueueListener(mpq, h)

        #h.setFormatter(f)
        root.addHandler(qh)

        l = logging.getLogger("Test")
        l.setLevel(logging.DEBUG)

        workers=[]

        for i in xrange(15):
            worker = Worker(logconf, i, qh)
            worker.daemon = True
            worker.start()
            workers.append(worker)

        for worker in workers:
            print "joining worker: {}".format(worker)
            worker.join()

        mpq.put_nowait(None)

        ql.start()

        # listener.join()

        for i in xrange(10):
            l.info("testing %i"%i)

if __name__ == "__main__":


    x = Main()
    x.start()

    time.sleep(10)

    print "Finish"

Now it mostly works until I reach a certain number of workers (~15) when for some reason the Main class get blocked in de join and the rest of the workers do nothing.

3条回答
够拽才男人
2楼-- · 2019-04-30 10:20

I figured out a pretty simple workaround using monkeypatching. It probably isn't robust and I am not an expert with the logging module, but it seemed like the best solution for my situation. After trying some code-changes (to enable passing in an existing logger, from multiprocess.get_logger()) I didn't like how much the code was changing and came up with a quick (well it would have been, had I done this in the first place) easy to read hack/workaround:

(working example, complete with multiprocessing pool)

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)

Of course this is probably not going to cover the whole gamut of logging usage, but I think the concept is simple enough here to get working quickly and relatively painlessly. And it should be easy to modify (for example the lambda func discards a possible prefix that can be passed into getLogger).

查看更多
forever°为你锁心
3楼-- · 2019-04-30 10:23

All your Workers share the same root logger object (obtained in Worker.__init__ -- the getLogger call always returns the same logger). However, every time you create a Worker, you add a handler (QueueHandler) to that logger.

So if you create 10 Workers, you will have 10 (identical) handlers on your root logger, which means output gets repeated 10 times.

Instead, you should make the logger a module attribute rather than an instance attribute, and configure it once at the module level -- not at the class level.

(actually, loggers should be configured once at the program level)

查看更多
一纸荒年 Trace。
4楼-- · 2019-04-30 10:26

I'm coming late, so you probably don't need the answer anymore. The problem comes from the fact that you already have a handler set in your main process, and in your worker you are adding another one. This means that in your worker process, two handlers are in fact managing your data, the one in pushing the log to queue, and the one writing to the stream.

You can fix this simply by adding an extra line self.root.handlers = [] to your code. From your original code, the __init__ method of the worker would look like this:

def __init__(self, n, q):
    super(Worker, self).__init__()
    self.n = n
    self.queue = q

    self.qh = QueueHandler(self.queue)
    self.root = logging.getLogger()
    self.root.handlers = []
    self.root.addHandler(self.qh)
    self.root.setLevel(logging.DEBUG)
    self.logger = logging.getLogger("W%i"%self.n)

The output now looks like this:

python workers.py 
2016-05-12 10:07:02,971 Worker-2   W0 INFO         Worker 0 Starting
2016-05-12 10:07:02,972 Worker-2   W0 INFO         testing 0
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 1
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 2
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 3
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 4
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 5
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 6
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 7
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 8
2016-05-12 10:07:02,973 Worker-2   W0 INFO         testing 9
2016-05-12 10:07:02,973 Worker-2   W0 INFO         Completed 0
Finish
查看更多
登录 后发表回答