Sending large amount of data between Qt threads

2019-01-16 12:53发布

问题:

I have a QThread which generates a fairly large amount of data regularly (couple of megabytes per second), and it needs to transmit it to the parent (GUI) thread.

I'm afraid I'm not that certain in the inner workings of QThread so I would like to ask for a best practice.

Obviously, the most direct way to transmit data is to just emit an array. However, how efficient is this? Does Qt know about where it is used and avoids deep copying it when sending and receiving it?

If not, I can gladly just allocate the memory in the main thread and give a pointer to the child thread where it will write the data (and only emit short messages about the progress). This does not seem to be the most elegant solution to me, this is why I'm asking.

If Qt avoids copying the data in multiple buffers when emitting and receiving, is it guaranteed in all systems? I don't have the resources to try benchmarking it under various OSs.

回答1:

QThread's inner workings are irrelevant: they play no role in how the event loops work. When you emit a signal in a QObject that lives in a thread different from the slot's object, the signal will be posted as a QMetaCallEvent to the event queue of the receiving thread. The event loop running in the receiving thread will then act on this event and execute the call into the slot that was connected to the emitted signal.

So, no matter what happens, whatever data you send through the signal will eventually end up as a payload in an instance of QEvent-derived class.

The meat of the issue is when the QMetaCallEvent reaches the event loop and the container gets passed into the slot as an argument. Of course the copy constructors could be called plenty of times along the way. Below is some simple code that demonstrates how many times the copy constructor and default constructor are in fact called

  • on the elements of the data members of an implicitly shared copy-on-write container (QVector),

  • on a custom class that stands in for a container.

You'll be pleasantly surprised :)

Since Qt containers are implicitly shared copy-on-write, their copy construction has negligible cost: all that's done is a reference counter is incremented atomically on construction. None of the data members are copied, for example.

Alas, pre-11 C++ shows its ugly side: if the slot code modifies the container in any way, there's no way to pass references to the slot in such a way that would let the compiler know that the original container is not needed anymore. Thus: if the slot receives a const reference to the container, you're guaranteed that no copies will be made. If the slot receives a writeable copy of the container and you modify it, there will be a completely unnecessary copy made since the instance alive at the call site is no longer needed. In C++-11 you'd pass an rvalue reference as a parameter. Passing an rvalue reference in a function call ends the lifetime of the passed object in the caller.

Sample code output:

"Started" copies: 0 assignments: 0 default instances: 0 
"Created Foo" copies: 0 assignments: 0 default instances: 100 
"Created Bar" copies: 0 assignments: 0 default instances: 100 
"Received signal w/const container" copies: 0 assignments: 0 default instances: 100 
"Received signal w/copy of the container" copies: 0 assignments: 0 default instances: 100 
"Made a copy" copies: 100 assignments: 1 default instances: 101 
"Reset" copies: 0 assignments: 0 default instances: 0 
"Received signal w/const class" copies: 2 assignments: 0 default instances: 1 
"Received signal w/copy of the class" copies: 3 assignments: 0 default instances: 1 
//main.cpp
#include <QtCore>

class Class {
    static QAtomicInt m_copies;
    static QAtomicInt m_assignments;
    static QAtomicInt m_instances;
public:
    Class() { m_instances.fetchAndAddOrdered(1); }
    Class(const Class &) { m_copies.fetchAndAddOrdered(1); }
    Class & operator=(const Class &) { m_assignments.fetchAndAddOrdered(1); return *this; }
    static void dump(const QString & s = QString()) {
        qDebug() << s << "copies:" << m_copies << "assignments:" << m_assignments << "default instances:" << m_instances;
    }
    static void reset() {
        m_copies = 0;
        m_assignments = 0;
        m_instances = 0;
    }
};

QAtomicInt Class::m_instances;
QAtomicInt Class::m_copies;
QAtomicInt Class::m_assignments;

typedef QVector<Class> Vector;

Q_DECLARE_METATYPE(Vector)

class Foo : public QObject
{
    Q_OBJECT
    Vector v;
public:
    Foo() : v(100) {}
signals:
    void containerSignal(const Vector &);
    void classSignal(const Class &);
public slots:
    void sendContainer() { emit containerSignal(v); }
    void sendClass() { emit classSignal(Class()); }
};

class Bar : public QObject
{
    Q_OBJECT
public:
    Bar() {}
signals:
    void containerDone();
    void classDone();
public slots:
    void containerSlotConst(const Vector &) {
        Class::dump("Received signal w/const container");
    }
    void containerSlot(Vector v) {
        Class::dump("Received signal w/copy of the container");
        v[99] = Class();
        Class::dump("Made a copy");
        Class::reset();
        Class::dump("Reset");
        emit containerDone();
    }
    void classSlotConst(const Class &) {
        Class::dump("Received signal w/const class");
    }
    void classSlot(Class) {
        Class::dump("Received signal w/copy of the class");
        emit classDone();
        //QThread::currentThread()->quit();
    }
};

int main(int argc, char ** argv)
{
    QCoreApplication a(argc, argv);
    qRegisterMetaType<Vector>("Vector");
    qRegisterMetaType<Class>("Class");

    Class::dump("Started");
    QThread thread;
    Foo foo;
    Bar bar;
    Class::dump("Created Foo");
    bar.moveToThread(&thread);
    Class::dump("Created Bar");
    QObject::connect(&thread, SIGNAL(started()), &foo, SLOT(sendContainer()));
    QObject::connect(&foo, SIGNAL(containerSignal(Vector)), &bar, SLOT(containerSlotConst(Vector)));
    QObject::connect(&foo, SIGNAL(containerSignal(Vector)), &bar, SLOT(containerSlot(Vector)));
    QObject::connect(&bar, SIGNAL(containerDone()), &foo, SLOT(sendClass()));
    QObject::connect(&foo, SIGNAL(classSignal(Class)), &bar, SLOT(classSlotConst(Class)));
    QObject::connect(&foo, SIGNAL(classSignal(Class)), &bar, SLOT(classSlot(Class)));
    QObject::connect(&bar, SIGNAL(classDone()), &thread, SLOT(quit()));
    QObject::connect(&thread, SIGNAL(finished()), &a, SLOT(quit()));
    thread.start();
    a.exec();
    thread.wait();
}

#include "main.moc"


回答2:

When communicating large buffers, it's 'traditional' to new() buffer objects in the producer thread and, when loaded up, queue/emit/whatever the *buffer to the consumer thread and immediately new() another one, (into the same *buffer var), for the next load of data.

Issue: if your GUI thread cannot keep up, you will get memory runaway unless you take some flow-control measure(eg. pre-allocating a pool of *buffers and 'circulating' them).

What I usually do is pre-allocate some buffer instances in a loop, (up to thousands in a big server), and push their instances onto a producer-consumer 'pool queue'. If a child thread wants to load data from some network connection into a buffer, it must pop one from the pool and load it up. It can then queue/emit/whatever the buffer to a consumer thread and pop another buffer for any more data that may come in. The consumer thread gets the bufer, processes the data and pushes the 'used' buffer back onto the pool queue for re-use. This provides flow-control: if the child thread loads buffers faster than the consumer thread can process them, it will find the pool empty and block on it until the consumer thread returns some used buffers, so capping buffer/memory use, (and also avoiding continual new/dispose, or GC in those languages that support it).

I like to dump the pool queue count to a GUI status bar on a 1-sec timer - this allows me to watch buffer use, (and quickly spot if any leak:).