QThreadPool - How to interrupt / How to use wisely

2019-08-06 18:52发布

Background :

I have a script that allows me to make spatial queries on a PostgreSQL database via an API coming from a private editor (I can't directly query the database). This API is working with python 3.2. To summarize very quickly, this script is used to download elements of this database in the desired geographical footprint. Depending of the zone, you can obtain between 1 to over 100 elements, each of them having very different sizes (from Ko to Go).

The main window let you to set all options and then start the global process. When launched a console window appears letting you see what’s going on. Once an item has been downloaded a short “report” is displayed on the console. Currently everything is done sequentially one element at a time. As you can imagine, if this element is quite large, the console freezes while waiting for the end of the download process.

Code:

I'm not going to post the full script here, but through a very simple script I will try to show the main problem I'm trying to solve (i.e. avoid locking the user interface / have some sort of real-time output on what's going on).

So, in order to avoid these freezing problems, the use of threads seemed to me to be the best solution. To simulate the download process (see previous chapter) I used the url.request urlretrieve method with multiple urls (pointing to files of different sizes).

import os
import sys
import time
import urllib.request
from PyQt4 import QtCore, QtGui

url_1m = 'http://ipv4.sbg.proof.ovh.net/files/1Mio.dat'
url_10m = 'http://ipv4.sbg.proof.ovh.net/files/10Mio.dat'
url_100m = 'http://ipv4.sbg.proof.ovh.net/files/100Mio.dat'
url_1g = 'http://ipv4.sbg.proof.ovh.net/files/1Gio.dat'
url_10g = 'http://ipv4.sbg.proof.ovh.net/files/10Gio.dat'

urls = (url_1m, url_10m, url_100m, url_1g, url_10g)


# ---------------------------------------------------------------------------------
class DownloadWorkerSignals(QtCore.QObject):
    """
    Defines the signals available from a running download worker thread.
    """
    finished = QtCore.pyqtSignal(str)


# ---------------------------------------------------------------------------------
class DownloadWorker(QtCore.QRunnable):
    """
    Worker thread
    """

    def __init__(self, url, filepath, filename, index):
        super(DownloadWorker, self).__init__()

        self.url = url
        self.file_path = filepath
        self.filename = filename
        self.index = index

        self.signals = DownloadWorkerSignals()

    @QtCore.pyqtSlot(str)
    def run(self):
        t = time.time()
        message = 'Thread %d started\n' % self.index
        try:
            # The urlretrieve method will copy a network object to a local file
            urllib.request.urlretrieve(url=self.url,
                                       filename=os.path.join(self.file_path,
                                                             self.filename))
        except IOError as error:
            message += str(error) + '\n'
        finally:
            message += 'Thread %d ended %.2f s\n' % (self.index, time.time() - t)
            self.signals.finished.emit(message)  # Done


# ---------------------------------------------------------------------------------
class Main(QtGui.QMainWindow):
    """
    Main window
    """

    def __init__(self):
        super(self.__class__, self).__init__()

        self.resize(400, 200)
        self.setWindowTitle("Main")
        self.setWindowModality(QtCore.Qt.ApplicationModal)

        self.centralwidget = QtGui.QWidget(self)
        self.setCentralWidget(self.centralwidget)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.buttonBox = QtGui.QDialogButtonBox(self.centralwidget)
        self.buttonBox.setStandardButtons(QtGui.QDialogButtonBox.Cancel | 
                                          QtGui.QDialogButtonBox.Ok)
        self.buttonBox.setGeometry(QtCore.QRect(10, 160, 380, 20))

        # Connect definition
        # -------------------------------------------------------------------------
        self.connect(self.buttonBox, 
                     QtCore.SIGNAL('accepted()'), 
                     self.button_ok_clicked)
        self.connect(self.buttonBox, 
                     QtCore.SIGNAL('rejected()'), 
                     self.button_cancel_clicked)

    # Connect functions
    # -----------------------------------------------------------------------------
    def button_cancel_clicked(self):
        self.close()

    def button_ok_clicked(self):
        # Launch console
        console = Console(parent=self)
        console.exec_()


# ---------------------------------------------------------------------------------------------------------------
class Console(QtGui.QDialog):
    """
    Console window
    """

    def __init__(self, parent):
        super(self.__class__, self).__init__()

        self.parent = parent

        self.resize(400, 200)
        self.setWindowTitle("Console")
        self.setModal(True)

        self.verticalLayout = QtGui.QVBoxLayout(self)

        # Text edit
        # -------------------------------------------------------------------------
        self.text_edit = QtGui.QPlainTextEdit(self)
        self.text_edit.setReadOnly(True)
        self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
        self.verticalLayout.addWidget(self.text_edit)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close)
        self.verticalLayout.addWidget(self.button_box)

        # Connect definition
        # -------------------------------------------------------------------------
        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close), 
                     QtCore.SIGNAL('clicked()'),
                     self.button_cancel_clicked)

        # Post initialization
        # -------------------------------------------------------------------------
        self.threadpool = QtCore.QThreadPool()
        self.threadpool.setMaxThreadCount(2)

        for index, url in enumerate(urls):
            worker = DownloadWorker(url=url,
                                    filepath='C:\\Users\\philippe\\Downloads',
                                    filename='url_%d.txt' % index,
                                    index=index)
            worker.signals.finished.connect(self.write_message)
            self.threadpool.start(worker)

        '''
        I have to wait for the end of the thread pool to make a post-processing.
        If I use the waitForDone I don't see my console until the all work is done 
        '''
        # self.threadpool.waitForDone()
        # self.write_stram('Thread pool finished')

    # Connect functions
    # -----------------------------------------------------------------------------
    def button_cancel_clicked(self):
        if self.threadpool.activeThreadCount() != 0:
            pass  # How to interrupt the threadpool ?
        self.close()

    @QtCore.pyqtSlot(str)
    def write_message(self, text):
        self.text_edit.insertPlainText(text)
        cursor = self.text_edit.textCursor()
        self.text_edit.setTextCursor(cursor)


# ---------------------------------------------------------------------------------
if __name__ == '__main__':
    app = QtGui.QApplication(sys.argv)
    window = Main()
    window.show()
    app.exec_()

Questions:

Everything seems to work as expected but I encounter two difficulties:

  1. At the end of the thread pool process I have to make some post-processing. If I use the waitForDone method I don't see my console until the all work is done and it’s not the type of behavior wanted.
  2. If the Cancel Button in the Console is clicked, I need to interrupt the threadpool and I don’t know how to manage that.

1条回答
一纸荒年 Trace。
2楼-- · 2019-08-06 19:31

I had another look at this problem (based largely on this : how-do-i-maintain-a-resposive-gui-using-qthread-with-pyqgis).

So I replaced the previous tandem QThreadPool/QRunnable, by Queue/QThread. The code below gives an overview.

import os
import sys
import time
import urllib.request
import queue
from PyQt4 import QtCore, QtGui

url_1m = 'http://ipv4.sbg.proof.ovh.net/files/1Mio.dat'
url_10m = 'http://ipv4.sbg.proof.ovh.net/files/10Mio.dat'
url_100m = 'http://ipv4.sbg.proof.ovh.net/files/100Mio.dat'
url_1g = 'http://ipv4.sbg.proof.ovh.net/files/1Gio.dat'
url_10g = 'http://ipv4.sbg.proof.ovh.net/files/10Gio.dat'

urls = (url_1m, url_10m, url_100m, url_1g, url_10g)


# ---------------------------------------------------------------------------------
class WorkerThread(QtCore.QThread):
    """
    Worker thread
    """

    def __init__(self, parent_thread):
        QtCore.QThread.__init__(self, parent_thread)

    def run(self):
        self.running = True
        success = self.do_work()
        self.emit(QtCore.SIGNAL('jobFinished(PyQt_PyObject)'), success)

    def stop(self):
        self.running = False
        pass

    def do_work(self):
        return True

    def clean_up(self):
        pass


# ---------------------------------------------------------------------------------
class LongRunningTask(WorkerThread):
    def __init__(self, parent_thread, url, filepath, filename, index):
        WorkerThread.__init__(self, parent_thread)

        self.url = url
        self.filepath = filepath
        self.filename = filename
        self.index = index

    def do_work(self):
        t = time.time()
        self.emit(QtCore.SIGNAL('threadText(PyQt_PyObject)'), 'Thread %d started\n' % self.index)

        try:
            # The urlretrieve method will copy a network object to a local file
            urllib.request.urlretrieve(url=self.url,
                                       filename=os.path.join(self.filepath,
                                                             self.filename))
        except IOError as error:
            self.emit(QtCore.SIGNAL('threadText(PyQt_PyObject)'),
                      'Thread %d error - ' % self.index + str(error) + '\n')
        finally:
            self.emit(QtCore.SIGNAL('threadText(PyQt_PyObject)'),
                      'Thread %d ended %.2f s\n' % (self.index, time.time() - t))
            return True


# ---------------------------------------------------------------------------------
class Console(QtGui.QDialog):
    """
    Console window
    """

    def __init__(self):
        super(self.__class__, self).__init__()

        self.resize(400, 200)
        self.setWindowTitle("Console")
        self.setModal(True)

        self.setLayout(QtGui.QVBoxLayout())

        # Text edit
        # -------------------------------------------------------------------------
        self.textEdit = QtGui.QPlainTextEdit(self)
        self.textEdit.setReadOnly(True)
        self.textEdit_cursor = QtGui.QTextCursor(self.textEdit.document())
        self.layout().addWidget(self.textEdit)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close)
        self.button_box.button(QtGui.QDialogButtonBox.Close).setEnabled(False)
        self.layout().addWidget(self.button_box)

        # Connect definition
        # -------------------------------------------------------------------------
        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close),
                     QtCore.SIGNAL('clicked()'),
                     self.reject)

        # Post-Initialization
        # -------------------------------------------------------------------------
        self.queue = queue.Queue()
        # self.queue = queue.Queue(maxsize=2)
        self.run_thread()

    # Connect functions
    # -----------------------------------------------------------------------------
    def cancel_thread(self):
        self.workerThread.stop()

    def job_finished_from_thread(self, success):
        self.workerThread.stop()
        self.queue.get()

        # Stop the pulsation
        if self.queue.empty():
            self.button_box.button(QtGui.QDialogButtonBox.Close).setEnabled(True)

        self.emit(QtCore.SIGNAL('jobFinished(PyQt_PyObject)'), success)

    def text_from_thread(self, value):
        self.textEdit.insertPlainText(value)
        cursor = self.textEdit.textCursor()
        self.textEdit.setTextCursor(cursor)

    def run_thread(self):
        for index, url in enumerate(urls):
            self.workerThread = LongRunningTask(parent_thread=self,
                                                url=url,
                                                filepath='C:\\Users\\philippe\\Downloads',
                                                filename='url_%d.txt' % index,
                                                index=index)
            self.connect(self.workerThread,
                         QtCore.SIGNAL('jobFinished(PyQt_PyObject)'),
                         self.job_finished_from_thread)
            self.connect(self.workerThread,
                         QtCore.SIGNAL('threadText(PyQt_PyObject)'),
                         self.text_from_thread)

            self.queue.put(self.workerThread)
            self.workerThread.start()

            # If I set the queue to maxsize=2, how to manage it here
            '''
            while not self.queue.full():
                self.queue.put(self.workerThread)
                self.workerThread.start()
            '''

# ---------------------------------------------------------------------------------
if __name__ == '__main__':
    app = QtGui.QApplication(sys.argv)
    window = Console()
    window.show()
    app.exec_()

Question: Unfortunately, I encounter other types of difficulties. In reality, the queue can contain a large amount of threads (over 100). 1. How can I, like the QthreadPool and its setMaxThreadCount method, manage the number of threads running in parallel in order to prevent the system from collapsing completely ?

查看更多
登录 后发表回答