Im trying to use python's default logging module in a multiprocessing scenario. I've read:
and other multiple posts about multiprocessing, logging, python classes and such. After all this reading I've came to this piece of code I cannot make it properly run which uses python's logutils QueueHandler:
import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time
from logutils.queue import QueueListener, QueueHandler
class Worker(Process):
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
self.qh = QueueHandler(self.queue)
self.root = logging.getLogger()
self.root.addHandler(self.qh)
self.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
def listener_process(queue):
while True:
try:
record = queue.get()
if record is None:
break
logger = logging.getLogger(record.name)
logger.handle(record)
except (KeyboardInterrupt, SystemExit):
raise
except:
import sys, traceback
print >> sys.stderr, 'Whoops! Problem:'
traceback.print_exc(file=sys.stderr)
if __name__ == "__main__":
mpq = mpQueue(-1)
root = logging.getLogger()
h = logging.StreamHandler()
f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
h.setFormatter(f)
root.addHandler(h)
l = logging.getLogger("Test")
l.setLevel(logging.DEBUG)
listener = Process(target=listener_process,
args=(mpq,))
listener.start()
workers=[]
for i in xrange(1):
worker = Worker(i, mpq)
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
worker.join()
mpq.put_nowait(None)
listener.join()
for i in xrange(10):
l.info("testing %i"%i)
print "Finish"
If the code is executed, the output somehow repeats lines like:
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7
2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8
2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0
2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9
2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0
2013-12-02 16:44:46,005 MainProcess Test INFO testing 0
2013-12-02 16:44:46,005 MainProcess Test INFO testing 1
2013-12-02 16:44:46,005 MainProcess Test INFO testing 2
2013-12-02 16:44:46,005 MainProcess Test INFO testing 3
2013-12-02 16:44:46,005 MainProcess Test INFO testing 4
2013-12-02 16:44:46,005 MainProcess Test INFO testing 5
2013-12-02 16:44:46,006 MainProcess Test INFO testing 6
2013-12-02 16:44:46,006 MainProcess Test INFO testing 7
2013-12-02 16:44:46,006 MainProcess Test INFO testing 8
2013-12-02 16:44:46,006 MainProcess Test INFO testing 9
Finish
In other questios it's suggested that the handler gets added more than once, but, as you can see, I only add the streamhanlder once in the main method. I've already tested embedding the main method into a class with the same result.
EDIT: as @max suggested (or what I believe he said) I've modified the code of the worker class as:
class Worker(Process):
root = logging.getLogger()
qh = None
def __init__(self, n, q):
super(Worker, self).__init__()
self.n = n
self.queue = q
if not self.qh:
Worker.qh = QueueHandler(self.queue)
Worker.root.addHandler(self.qh)
Worker.root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
print self.root.handlers
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
With the same results, Now the queue handler is not added again and again but still there are duplicate log entries, even with just one worker.
EDIT2: I've changed the code a little bit. I changed the listener process and now use a QueueListener (that's what I intended in the begining anyway), moved the main code to a class.
import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time
from logutils.queue import QueueListener, QueueHandler
root = logging.getLogger()
added_qh = False
class Worker(Process):
def __init__(self, logconf, n, qh):
super(Worker, self).__init__()
self.n = n
self.logconf = logconf
# global root
global added_qh
if not added_qh:
added_qh = True
root.addHandler(qh)
root.setLevel(logging.DEBUG)
self.logger = logging.getLogger("W%i"%self.n)
#print root.handlers
def run(self):
self.logger.info("Worker %i Starting"%self.n)
for i in xrange(10):
self.logger.log(INFO, "testing %i"%i)
self.logger.log(INFO, "Completed %i"%self.n)
class Main(object):
def __init__(self):
pass
def start(self):
mpq = mpQueue(-1)
qh = QueueHandler(mpq)
h = logging.StreamHandler()
ql = QueueListener(mpq, h)
#h.setFormatter(f)
root.addHandler(qh)
l = logging.getLogger("Test")
l.setLevel(logging.DEBUG)
workers=[]
for i in xrange(15):
worker = Worker(logconf, i, qh)
worker.daemon = True
worker.start()
workers.append(worker)
for worker in workers:
print "joining worker: {}".format(worker)
worker.join()
mpq.put_nowait(None)
ql.start()
# listener.join()
for i in xrange(10):
l.info("testing %i"%i)
if __name__ == "__main__":
x = Main()
x.start()
time.sleep(10)
print "Finish"
Now it mostly works until I reach a certain number of workers (~15) when for some reason the Main class get blocked in de join and the rest of the workers do nothing.
I figured out a pretty simple workaround using monkeypatching. It probably isn't robust and I am not an expert with the logging module, but it seemed like the best solution for my situation. After trying some code-changes (to enable passing in an existing logger, from
multiprocess.get_logger()
) I didn't like how much the code was changing and came up with a quick (well it would have been, had I done this in the first place) easy to read hack/workaround:(working example, complete with multiprocessing pool)
Of course this is probably not going to cover the whole gamut of
logging
usage, but I think the concept is simple enough here to get working quickly and relatively painlessly. And it should be easy to modify (for example the lambda func discards a possible prefix that can be passed intogetLogger
).All your
Worker
s share the same root logger object (obtained inWorker.__init__
-- thegetLogger
call always returns the same logger). However, every time you create aWorker
, you add a handler (QueueHandler
) to that logger.So if you create 10 Workers, you will have 10 (identical) handlers on your root logger, which means output gets repeated 10 times.
Instead, you should make the logger a module attribute rather than an instance attribute, and configure it once at the module level -- not at the class level.
(actually, loggers should be configured once at the program level)
I'm coming late, so you probably don't need the answer anymore. The problem comes from the fact that you already have a handler set in your main process, and in your worker you are adding another one. This means that in your worker process, two handlers are in fact managing your data, the one in pushing the log to queue, and the one writing to the stream.
You can fix this simply by adding an extra line
self.root.handlers = []
to your code. From your original code, the__init__
method of the worker would look like this:The output now looks like this: