python multiprocessing - sending child process log

2019-07-13 14:19发布

问题:

I'm building an interface on top of some analysis code I've written that executes some SQL and processes the query results. There's logging surrounding a number of the events in this analysis code that I would like to expose to the user. Because the analysis code is rather long-running, and because I don't want the UI to block, thus far I've done this through putting the analysis function in to its own thread.

Simplified example of what I have now (complete script):

import sys
import time
import logging
from PySide2 import QtCore, QtWidgets

def long_task():
    logging.info('Starting long task')
    time.sleep(3) # this would be replaced with a real task
    logging.info('Long task complete')

class LogEmitter(QtCore.QObject):
    sigLog = QtCore.Signal(str)

class LogHandler(logging.Handler):
    def __init__(self):
        super().__init__()
        self.emitter = LogEmitter()
    def emit(self, record):
        msg = self.format(record)
        self.emitter.sigLog.emit(msg)

class LogDialog(QtWidgets.QDialog):
    def __init__(self, parent=None):
        super().__init__(parent)
        log_txt = QtWidgets.QPlainTextEdit(self)
        log_txt.setReadOnly(True)
        layout = QtWidgets.QHBoxLayout(self)
        layout.addWidget(log_txt)
        self.setWindowTitle('Event Log')
        handler = LogHandler()
        handler.emitter.sigLog.connect(log_txt.appendPlainText)
        logger = logging.getLogger()
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)

class Worker(QtCore.QThread):
    results = QtCore.Signal(object)

    def __init__(self, func, *args, **kwargs):
        super().__init__()
        self.func = func
        self.args = args
        self.kwargs = kwargs

    def run(self):
        results = self.func(*self.args, **self.kwargs)
        self.results.emit(results)

class MainWindow(QtWidgets.QMainWindow):

    def __init__(self):
        super().__init__()
        widget = QtWidgets.QWidget()
        layout = QtWidgets.QHBoxLayout(widget)
        start_btn = QtWidgets.QPushButton('Start')
        start_btn.clicked.connect(self.start)
        layout.addWidget(start_btn)
        self.setCentralWidget(widget)

        self.log_dialog = LogDialog()
        self.worker = None

    def start(self):
        if not self.worker:
            self.log_dialog.show()
            logging.info('Run Starting')
            self.worker = Worker(long_task)
            self.worker.results.connect(self.handle_result)
            self.worker.start()

    def handle_result(self, result=None):
        logging.info('Result received')
        self.worker = None

if __name__ == '__main__':
    app = QtWidgets.QApplication()
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())

This works fine, except that I need to be able to allow the user to stop the execution of the analysis code. Everything I've read indicates that there is no way to interrupt threads nicely, so using the multiprocessing library seems to be the way to go (there's no way to re-write the analysis code to allow for periodic polling, since the majority of time is spent just waiting for the queries to return results). It's easy enough to get the same functionality in terms of executing the analysis code in a way that doesn't block the UI by using multiprocessing.Pool and apply_async.

E.g. replacing MainWindow from above with:

class MainWindow(QtWidgets.QMainWindow):

    def __init__(self):
        super().__init__()
        widget = QtWidgets.QWidget()
        layout = QtWidgets.QHBoxLayout(widget)
        start_btn = QtWidgets.QPushButton('Start')
        start_btn.clicked.connect(self.start)
        layout.addWidget(start_btn)
        self.setCentralWidget(widget)

        self.log_dialog = LogDialog()
        self.pool = multiprocessing.Pool()
        self.running = False

    def start(self):
        if not self.running:
            self.log_dialog.show()
            logging.info('Run Starting')
            self.pool.apply_async(long_task, callback=self.handle_result)

    def handle_result(self, result=None):
        logging.info('Result received')
        self.running = False

But I can't seem to figure out how I would go about retrieving the logging output from the child process and passing it to the parent to update the log dialog. I've read through just about every SO question on this as well as the cookbook examples of how to handle writing to a single log file from multiple processes, but I can't wrap my head around how to adapt those ideas to what I'm trying to do here.

Edit

So trying to figure out what might be going on for why I'm seeing different behavior than @eyllanesc I added:

logger = logging.getLogger()
print(f'In Func: {logger} at {id(logger)}')

and

logger = logging.getLogger()
print(f'In Main: {logger} at {id(logger)}')

to long_task and Mainwindow.start, respectively. When I run main.py I get:

In Main: <RootLogger root (INFO)> at 2716746681984
In Func: <RootLogger root (WARNING)> at 1918342302352

which seems to be what was described in this SO question

This idea of using a Queue and QueueHandler though as a solution seems similar to @eyllanesc's original solution

回答1:

The signals do not transmit data between processes, so for this case a Pipe must be used and then emit the signal:

# other imports
import threading
# ...

class LogHandler(logging.Handler):
    def __init__(self):
        super().__init__()
        self.r, self.w = multiprocessing.Pipe()
        self.emitter = LogEmitter()
        threading.Thread(target=self.listen, daemon=True).start()

    def emit(self, record):
        msg = self.format(record)
        self.w.send(msg)

    def listen(self):
        while True:
            try:
                msg = self.r.recv()
                self.emitter.sigLog.emit(msg)
            except EOFError:
                break

# ...


回答2:

In case anyone wanders in to this down the road, using QueueHandler and QueueListener leads to a solution that works on Windows as well. Borrowed heavily from this answer to a similar question:

import logging
import sys
import time
import multiprocessing
from logging.handlers import QueueHandler, QueueListener
from PySide2 import QtWidgets, QtCore

def long_task():
    logging.info('Starting long task')
    time.sleep(3) # this would be replaced with a real task
    logging.info('Long task complete')

def worker_init(q):
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    logger.addHandler(qh)

class LogEmitter(QtCore.QObject):
    sigLog = QtCore.Signal(str)

class LogHandler(logging.Handler):
    def __init__(self):
        super().__init__()
        self.emitter = LogEmitter()
    def emit(self, record):
        msg = self.format(record)
        self.emitter.sigLog.emit(msg)

class LogDialog(QtWidgets.QDialog):
    def __init__(self, parent=None):
        super().__init__(parent)
        self.log_txt = QtWidgets.QPlainTextEdit(self)
        self.log_txt.setReadOnly(True)
        layout = QtWidgets.QHBoxLayout(self)
        layout.addWidget(self.log_txt)
        self.setWindowTitle('Event Log')

class MainWindow(QtWidgets.QMainWindow):

    def __init__(self):
        super().__init__()
        widget = QtWidgets.QWidget()
        layout = QtWidgets.QHBoxLayout(widget)
        start_btn = QtWidgets.QPushButton('Start')
        start_btn.clicked.connect(self.start)
        layout.addWidget(start_btn)
        self.setCentralWidget(widget)

        self.log_dialog = LogDialog()
        self.running = False

        # sets up handler that will be used by QueueListener
        # which will update the LogDialoag
        handler = LogHandler()
        handler.emitter.sigLog.connect(self.log_dialog.log_txt.appendPlainText)

        self.q = multiprocessing.Queue()
        self.ql = QueueListener(self.q, handler)
        self.ql.start()

        # main process should also log to a QueueHandler
        self.main_log = logging.getLogger('main')
        self.main_log.propagate = False
        self.main_log.setLevel(logging.INFO)
        self.main_log.addHandler(QueueHandler(self.q))

        self.pool = multiprocessing.Pool(1, worker_init, [self.q])

    def start(self):
        if not self.running:
            self.log_dialog.show()
            self.main_log.info('Run Starting')
            self.pool.apply_async(long_task, callback=self.handle_result)

    def handle_result(self, result=None):
        time.sleep(2)
        self.main_log.info('Result received')
        self.running = False

    def closeEvent(self, _):
        self.ql.stop()

if __name__ == '__main__':
    app = QtWidgets.QApplication()
    win = MainWindow()
    win.show()
    sys.exit(app.exec_())