How to Compress Slot Calls When Using Queued Conne

2019-01-13 19:00发布

问题:

After reading some articles like this about Qt Signal-Slot communications I still have a question concerning the queued connection.

If I have some threads sending signals all the time to each other and lets say one thread_slowis running a slow method in it's event loop and another thread_fast is running a fast one that sends multiple signals while the other thread is still running it's slow method.....when the slow method from thread_slow returns to the event loop, will it process all the signals that were sent before by thread_fastor just the last one (all the signals are the same type)?

If it will process all the signals, is it there a way to make the thread_slow only process the last one? (Considering "the last one" in a multithread application might be vague, let's consider the last signal before the thread asked for the last signal, for the sake of simplicity, so new ones being sent while the thread looks for the last might be lost).

(I am asking this because I have multiple threads receiving data from multiple threads, and I dont want them to process old data, just the last one that was sent)

I have run some tests, and it appears that Qt will process all the signals. I made one thread do:

while(true)
{
    QThread::msleep(500);
    emit testQueue(test);
    test++;
}

and a slot in another will do:

void test::testQueue(int test)
{
    test.store(private_test.load() + test);
    emit testText(QString("Test Queue: ") + QString::number(private_test.load()));
}

and the thread will run:

while(true)
{
    QThread::msleep(3000);
    QCoreApplication::processEvents();
    private_test.store(private_test.load() + 1000);
}

I am sending a signal from one thread to the other every 500 milliseconds, and the other thread sleeps for 3000 milliseconds (3 seconds) and then wakes up and increment an internal variable by 100. every time the slot is executed it emits a text with the value received + the internal variable. The result I am having is that every time QCoreApplication::processEvents(); is called, all signals are executed....(I edited this part because I found a bug in my previous code)

回答1:

I am trying to form my comment into an answer. I agree with you about that the documentation is lacking this information, or at least it is not clear for me, and apparently for you either.

There would be two options to get more information:

1) Trial

Put a qDebug() or printf()/fprintf() statement into your slot in the "slow" thread and see what it prints out. Run this a few times and draw the conclusion.

2) Making sure

You would need to read the source code for this how the meta object compiler, aka. moc gets this through from the source file. This is a bit more involved investigation, but this could lead to certainity.

As far as I know, every signal emission posting a corresponding event. Then, the event will be queued for the separate thread within the thread class. Here you can find the relevant two source code files:

void QCoreApplication::postEvent(QObject *receiver, QEvent *event, int priority)

and

class QPostEventList : public QVector

There are two approaches with their trade-offs:

Queue a busy slot operation from the data mutator slot

The main advantage is that signals could not be lost during the busy operation. However, this could be inherently slower as it can potentially process a lot more operation than needed.

The idea is that the data is re-set for each event handled, but the real busy operation is queued for execution only once. It does not necessarily have to be the for the first event if there are more, but that is the simplest implementation.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
    connect(this, SIGNAL(queueBusyOperationSignal()), SLOT(busyOperation()));
    ...
}

void Foo::dataUpdateSlot(const QByteArray &data)
{
    m_data = data;

    if (busyOperationQueued);
        emit queueBusyOperationSignal();
        m_busyOperationQueued = true;
    }
}

void MyClass::busyOperationSlot()
{

    // Do the busy work with m_data here

    m_busyOperationQueued = false;    
}

Connect/Disconnect

The idea is to disconnect the slot from the corresponding signal when starting the processing. This will ensure that new signal emission would not be caught, and connect the slot to the signal again once the thread is free to process the next events.

This would have some idle time in the thread though between the connection and the next even handled, but at least this would be a simple way of implmeneting it. It may actually be even negligible a performance difference depending on more context not really provided here.

The main drawback is that this would lose the signals during the busy operation.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(busyOperationSlot(const QByteArray&)));
    ...
}

void MyClass::busyOperationSlot(const QByteArray &data)
{
    disconnect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), this, SLOT(dataUpdateSlot(const QByteArray&)));

    // Do the busy work with data here

    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
}

Future thoughts

I was thinking if there was a convenient API - e.g. a processEvents() alike method, but with an argument to process only the last event posted - for actually telling the event system explicitly to process the last one rather than circumventing the issue itself. It does appear to be such an API, however, it is private.

Perhaps, someone will submit a feature request to have something like that in public.

/*!
\internal
Returns \c true if \a event was compressed away (possibly deleted) and should not be added to the list.
*/
bool QCoreApplication::compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents)

The relevant source code can be found here.

It also seems to have an overriden version in QGuiApplication and QApplication.

As for completeness, there is also such a method like this:

void QCoreApplication::removePostedEvents(QObject * receiver, int eventType = 0) [static]

Removes all events of the given eventType that were posted using postEvent() for receiver.

The events are not dispatched, instead they are removed from the queue. You should never need to call this function. If you do call it, be aware that killing events may cause receiver to break one or more invariants.

If receiver is null, the events of eventType are removed for all objects. If eventType is 0, all the events are removed for receiver. You should never call this function with eventType of 0. If you do call it in this way, be aware that killing events may cause receiver to break one or more invariants.

But this is not quite what you would like to have here as per documentation.



回答2:

QCoreApplication QMetaCallEvent Compression

Every queued slot call ends up in the posting of a QMetaCallEvent to the target object. The event contains the sender object, the signal id, the slot index, and packaged call parameters. On Qt 5, the signal id generally doesn't equal the value returned by QMetaObject::signalIndex(): it is an index computed as if the object only had signal methods and no other methods.

The objective is to compress such calls so that only one unique call exists in the event queue for a given tuple of (sender object, sender signal, receiver object, receiver slot).

This is the only sane way to do it, without having to make changes to source or target objects, and while maintaining minimal overhead. The event-loop-recursing methods in my other answers have serious stack overhead per each event, on the order of 1kbyte when Qt is built for 64-bit-pointer architectures.

The event queue can be accessed when new events are posted to an object that has one or more events already posted to it. In such case, QCoreApplication::postEvent calls QCoreApplication::compressEvent. compressEvent is not called when the first event is posted to an object. In a reimplementation of this method, the contents of a QMetaCallEvent posted to the target object can be checked for a call to your slot, and the obsolete duplicate has to be deleted. Private Qt headers have to be included to obtain the definitions of QMetaCallEvent, QPostEvent and QPostEventList.

Pros: Neither the sender nor the receiver objects have to be aware of anything. Signals and slots work as-is, including the method-pointer calls in Qt 5. Qt itself uses this way of compressing events.

Cons: Requires inclusion of private Qt headers and forcible clearing of QEvent::posted flag.

Instead of hacking the QEvent::posted flag, the events to be deleted can be queued in a separate list and deleted outside of the compressEvent call, when a zero-duration timer is fired. This has the overhead of an extra list of events, and each event deletion iterating through the posted event list.

Other Approaches

The point of doing it some other way is not to use Qt's internals.

L1 The first limitation is not having access to the contents of privately defined QMetaCallEvent. It can be dealt with as follows:

  1. A proxy object with signals and slots of same signatures as those of the target can be connected between the source and target objects.

  2. Running the QMetaCallEvent on a proxy object allows extraction of the call type, the called slot id, and the arguments.

  3. In lieu of signal-slot connections, events can be explicitly posted to the target object. The target object, or an event filter, must explicitly re-synthesize the slot call from event's data.

  4. A custom compressedConnect implementation can be used in lieu of QObject::connect. This fully exposes the details of the signal and slot. A proxy object can be used to perform a compression-friendly equivalent of queued_activate on the side of the sender object.

L2 The second limitation is not being able to completely reimplement QCoreApplication::compressEvent, since the event list is defined privately. We still have access to the event being compressed, and we can still decide whether to delete it or not, but there's no way to iterate the event list. Thus:

  1. The event queue can be accessed implicitly by recursively calling sendPostedEvents from within notify (thus also from eventFilter(), event() or from the slots). This doesn't cause a deadlock, since QCoreApplication::sendPostedEvents can't (and doesn't) hold an event loop mutex while the event is delivered via sendEvent. Events can be filtered as follows:

    • globally in a reimplemented QCoreApplication::notify,
    • globally by registering a QInternal::EventNotifyCallback,
    • locally by attaching an event filter to the objects,
    • explicitly locally by reimplementing QObject::event() in the target class.

    The duplicate events are still posted to the event queue. The recursive calls to notify from within sendPostedEvents consume quite a bit of stack space (budget 1kb on 64-bit-pointer architectures).

  2. The events already present can be removed by calling QCoreApplication::removePostedEvents before posting a new event to an object. Unfortunately, doing this within QCoreApplication::compressEvent causes a deadlock as the event queue mutex is already held.

    A custom event class that includes the pointer to the receiver object can automatically call removePostedEvents in the constructor.

  3. Existing compressed events, such as QEvent::Exit, can be reappropriated.

    The set of those events is an implementation detail and could change. Qt doesn't discriminate among those events other than by the receiver QObject pointer. An implementation requires the overhead of a proxy QObject per each (event type, receiver object) tuple.

Implementation

The code below works on both Qt 4 and Qt 5. On the latter, make sure to add QT += core-private to your qmake project file, so that private Qt headers get included.

The implementations not using Qt internal headers are given in other answers:

  • Using L1.1 and L2.1.
  • Using L1.2 and L2.1.

There are two event-removing code paths, selected by if (true). The enabled code path retains the most recent event and makes most sense, typically. Alternatively, you could want to retain the oldest event - that's what the disabled code path does.

#include <QApplication>
#include <QMap>
#include <QSet>
#include <QMetaMethod>
#include <QMetaObject>
#include <private/qcoreapplication_p.h>
#include <private/qthread_p.h>
#include <private/qobject_p.h>

#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

// Works on both Qt 4 and Qt 5.

//
// Common Code

/*! Keeps a list of singal indices for one or more meatobject classes.
 * The indices are signal indices as given by QMetaCallEvent.signalId.
 * On Qt 5, those do *not* match QMetaObject::methodIndex since they
 * exclude non-signal methods. */
class SignalList {
    Q_DISABLE_COPY(SignalList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
    /*! Returns a signal index that is can be compared to QMetaCallEvent.signalId. */
    static int signalIndex(const QMetaMethod & method) {
        Q_ASSERT(method.methodType() == QMetaMethod::Signal);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        int index = -1;
        const QMetaObject * mobj = method.enclosingMetaObject();
        for (int i = 0; i <= method.methodIndex(); ++i) {
            if (mobj->method(i).methodType() != QMetaMethod::Signal) continue;
            ++ index;
        }
        return index;
#else
        return method.methodIndex();
#endif
    }
public:
    SignalList() {}
    void add(const QMetaMethod & method) {
        m_data[method.enclosingMetaObject()].insert(signalIndex(method));
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(signalIndex(method));
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int signalId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(signalId);
    }
};

//
// Implementation Using Event Compression With Access to Private Qt Headers

struct EventHelper : private QEvent {
    static void clearPostedFlag(QEvent * ev) {
        (&static_cast<EventHelper*>(ev)->t)[1] &= ~0x8001; // Hack to clear QEvent::posted
    }
};

template <class Base> class CompressorApplication : public Base {
    SignalList m_compressedSignals;
public:
    CompressorApplication(int & argc, char ** argv) : Base(argc, argv) {}
    void addCompressedSignal(const QMetaMethod & method) { m_compressedSignals.add(method); }
    void removeCompressedSignal(const QMetaMethod & method) { m_compressedSignals.remove(method); }
protected:
    bool compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents) {
        if (event->type() != QEvent::MetaCall)
            return Base::compressEvent(event, receiver, postedEvents);

        QMetaCallEvent *mce = static_cast<QMetaCallEvent*>(event);
        if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
        for (QPostEventList::iterator it = postedEvents->begin(); it != postedEvents->end(); ++it) {
            QPostEvent &cur = *it;
            if (cur.receiver != receiver || cur.event == 0 || cur.event->type() != event->type())
                continue;
            QMetaCallEvent *cur_mce = static_cast<QMetaCallEvent*>(cur.event);
            if (cur_mce->sender() != mce->sender() || cur_mce->signalId() != mce->signalId() ||
                    cur_mce->id() != mce->id())
                continue;
            if (true) {
              /* Keep The Newest Call */              
              // We can't merely qSwap the existing posted event with the new one, since QEvent
              // keeps track of whether it has been posted. Deletion of a formerly posted event
              // takes the posted event list mutex and does a useless search of the posted event
              // list upon deletion. We thus clear the QEvent::posted flag before deletion.
              EventHelper::clearPostedFlag(cur.event);
              delete cur.event;
              cur.event = event;
            } else {
              /* Keep the Oldest Call */
              delete event;
            }
            return true;
        }
        return false;
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        connect(invoke, &QPushButton::clicked, this, &Widget::sendSignals);
        connect(&m_signaller, &Signaller::emptySignal, this, &Widget::emptySlot, Qt::QueuedConnection);
        connect(&m_signaller, &Signaller::dataSignal, this, &Widget::dataSlot, Qt::QueuedConnection);
#else
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
#endif
    }
};

int main(int argc, char *argv[])
{
    CompressorApplication<QApplication> a(argc, argv);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::emptySignal));
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::dataSignal));
#else
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("emptySignal()")));
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("dataSignal(int)")));
#endif
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"


回答3:

This is another approach. It requires no changes to the sender nor receiver objects, but requires a custom CompressorProxy object. This is portable to both Qt 4 and Qt 5, and requires no access to Qt's internals.

The compressor object must be a child of the target object -- the one with the slots. That way it tracks the thread of the target object. Since the compressor's signals are attached to the target's slots, when they are in the same thread there is no overhead of queued connections for the target slot calls.

The magic happens in the emitCheck method: it calls itself recursively.

  1. A slot call ends up in emitCheck.
  2. Further posted events are sent by calling sendPostedEvents.
  3. If there are any duplicate slot calls in the event queue, they'll end up in emitCheck again.
  4. Once the last event in the queue is picked up, and sendPostedEvents doesn't recurse anymore, a flag is reset for the given slot, so that its proxy signal won't be emitted more than once. That's the desired compression behavior.

For any given set of queued slot calls to an instance of CompressorProxy, emitCheck will return true only exactly once for a slot that was called multiple times in a pass through the posted event list.

Note that the stack use per recursive call in release mode is around 600 bytes on 32 bit architectures, and twice that on 64 bit architectures. In debug mode on OS X, using a 64 bit build, the stack use per recursion is ~4kb.

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

class CompressorProxy : public QObject {
    Q_OBJECT
    bool emitCheck(bool & flag) {
        flag = true;
        QCoreApplication::sendPostedEvents(this, QEvent::MetaCall); // recurse
        bool result = flag;
        flag = false;
        return result;
    }

    bool m_slot;
    Q_SLOT void slot() {
        if (emitCheck(m_slot)) emit signal();
    }
    Q_SIGNAL void signal();

    bool m_slot_int;
    Q_SLOT void slot_int(int arg1) {
        if (emitCheck(m_slot_int)) emit signal_int(arg1);
    }
    Q_SIGNAL void signal_int(int);
public:
    // No default constructor, since the proxy must be a child of the
    // target object.
    explicit CompressorProxy(QObject * parent) : QObject(parent) {}
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        CompressorProxy * proxy = new CompressorProxy(this);
        connect(&m_signaller, SIGNAL(emptySignal()), proxy, SLOT(slot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), proxy, SLOT(slot_int(int)), Qt::QueuedConnection);
        connect(proxy, SIGNAL(signal()), this, SLOT(emptySlot()));
        connect(proxy, SIGNAL(signal_int(int)), this, SLOT(dataSlot(int)));
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"


回答4:

This is a yet another approach portable to both Qt 4 and Qt 5, and requiring no access to Qt's internals (other than what's available via public headers). On Qt 5, only the Qt 4-style connections are supported. The compressed entities are (receiver object, slot) pairs. This is different than the (sender, receiver, signal, slot) tuple used when one has full access to QMetaCallEvent.

It utilizes the QObject::qt_metacall to snoop the call's details from the black box QMetaCallEvent. Recursion into sendPostedEvents is used, just like in my other no-internals answer.

It's worth noting that QObject::qt_metacall's API has remained unchanged since at least Qt 4.0.

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>
#include <QSet>
#include <QMetaMethod>

// Common Code

/*! Keeps a list of method indices for one or more meatobject classes. */
class MethodList {
    Q_DISABLE_COPY(MethodList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
public:
    MethodList() {}
    template <class T> void add(const char * slot) {
        add(T::staticMetaObject.method(T::staticMetaObject.indexOfSlot(slot)));
    }
    void add(const QMetaMethod & method) {
        Q_ASSERT(method.methodIndex() >= 0);
        m_data[method.enclosingMetaObject()].insert(method.methodIndex());
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(method.methodIndex());
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int methodId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(methodId);
    }
};
Q_GLOBAL_STATIC(MethodList, compressedSlots)

// Compressor

class Compressor : public QObject {
    enum { Idle, Armed, Valid } m_state;
    QMetaObject::Call m_call;
    int m_methodIndex;
    QSet<int> m_armed; // armed method IDs

    int qt_metacall(QMetaObject::Call call, int id, void ** args) {
        if (m_state != Armed) return QObject::qt_metacall(call, id, args);
        m_state = Valid;
        m_call = call;
        m_methodIndex = id;
        return 0;
    }
    bool eventFilter(QObject * target, QEvent * ev) {
        Q_ASSERT(target == parent());
        if (ev->type() == QEvent::MetaCall) {
            m_state = Armed;
            if (QT_VERSION < QT_VERSION_CHECK(5,0,0) || ! *(void**)(ev+1)) {
                // On Qt5, we ensure null QMetaCallEvent::slotObj_ since we can't handle Qt5-style member pointer calls
                Compressor::event(ev); // Use QObject::event() and qt_metacall to extract metacall data
            }
            if (m_state == Armed) m_state = Idle;
            // Only intercept compressed slot calls
            if (m_state != Valid || m_call != QMetaObject::InvokeMetaMethod ||
                    ! compressedSlots()->contains(target->metaObject(), m_methodIndex)) return false;
            int methodIndex = m_methodIndex;
            m_armed.insert(methodIndex);
            QCoreApplication::sendPostedEvents(target, QEvent::MetaCall); // recurse
            if (! m_armed.contains(methodIndex)) return true; // Compress the call
            m_armed.remove(methodIndex);
        }
        return false;
    }
public:
    Compressor(QObject * parent) : QObject(parent), m_state(Idle) {
        parent->installEventFilter(this);
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    compressedSlots()->add<Widget>("emptySlot()");
    compressedSlots()->add<Widget>("dataSlot(int)");
    Widget w;
    new Compressor(&w);
    w.show();
    return a.exec();
}

#include "main.moc"


回答5:

thread_slow 

will process all the signals sent in its event loop, if you used queue connection or postEvent

Source:

Queued Connection The slot is invoked when control returns to the event loop of the receiver's thread. The slot is executed in the receiver's thread.

QtDoc

If you want more details on how the event are processed you can look here:

https://qt.gitorious.org/qt/qtbase/source/631c3dbc800bb9b2e3b227c0a09523f0f7eef0b7:src/corelib/thread/qthread_p.h#L127

As you can see, event are sorted in priority order, so if all your events have the same priority, it is first in first out.

It is not a trivial task, here an rough attempt, tell me if it works.

What I suggest is to basically store the events yourself and to process only the last one.

thread_slow.h

int current_val;
bool m_isRunning;

thread_slow.cpp

void enqueue_slot( int val /*or whatever you value is*/ ) {
     // You'll enventually need a a QMutex here if your slot is not call in the thread
     m_current_val = val;
     if( !m_isRunning )
         slowRun();
}

void checkHasReceivedEventSlot() {
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}

void slowRun() {
    m_isRunning = true;
    int v = m_current_val;
    m_current_val = -1; // Invalid value

   // Do stuff with v

   // Let the queue fill itself with enqueue_slot calls
   QTimer::singleShot(kTIMEOUT, this, SLOT(checkHasReceivedEventSlot()));
}

The first time enqueue_slot is called, slow run will start

EDIT:

To ensure that it is the last event, you could maybe do the following:

void checkHasReceivedEventSlot() {
    // Runs enqueue_slot until no more events are in the loop
    while( m_thread->eventDispatcher()->hasPendingEvents() )
         m_thread->eventDispatcher()->processEvents(QEventLoop::AllEvents);

    // m_current_val should hold the last event
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}


回答6:

From question: "If it will process all the signals, is it there a way to make the thread_slow only process the last one?"

If you just want to always get the last signal processed, and don't mind if few extra signals get processed as long as it does not make things slow, then you could try a very simple approach like this, using the regular QThread::exec() event loop. Put these slot methods into a QObject subclass, which you then move to a thread:

//slot
void MyClass::publicReceiverSlotForQueuedSignals(QString data)
{
    // Update data every time
    mReceivedData = data;

    // Allow worker method to be queued just once
    if (!mWorkerSlotInvoked) {
        mWorkerSlotInvoked = true;
        QMetaObject::invokeMethod(this, "workerSlot", Qt::QueuedConnection);
        qDebug() << "publicReceiverSlotForQueuedSignals: invoked workerSlot!"
                 << "New data:" << mReceivedData;
    } else {
        qDebug() << "publicReceiverSlotForQueuedSignals: workerSlot already invoked."
                 << "New data:" << mReceivedData;
    }
}

//slot
void MyClass::privateWorkerSlot()
{
    mWorkerSlotInvoked = false;
    qDebug() << "workerSlot for data:" << mReceivedData;
    QThread::msleep(3000);
    qDebug() << "workerSlot returning.";
}

The publicReceiverSlotForQueuedSignals goes through very fast (qDebug in else is probably the most time consuming part for rapid calls), so it doesn't really matter how many signals are queued. And then privateWorkerSlot will get invoked just one per event loop rotation of that thread, no matter how slowly it goes.

Also it would be trivial to add a mutex to protect mReceivedData and mWorkerSlotInvoked in both slot methods (and everywhere else you might use them). Then you could make a direct connection to the slot, because invokeMethod is thread safe, and mutex would make handling the private data members of MyClass thread safe as well. Just make sure you copy the contents of mReceivedData to a local variable and unlock the mutex, before doing the time consuming processing of it.

Note: untested code, probably has a few mistakes.



回答7:

You can use the combination of DirectConnection and QueueConnection:

  1. On your Worker side (thread_slow):

    • A public slot, that is intended to be called by your task provider (thread_fast)

      void Working::process()
      {
         if (working)
         {
           printf("Drop a task %p\n", QThread::currentThread()); 
           return;
         }
      
        working = true;        
        emit realWork();
      }
      
    • A processing function (that is slow) : realProcess()

      void Working::realProcess()
      {
          printf("      **** Working ... %p\n",QThread::currentThread()); fflush(stdout);
      
          // Emulate a big processing ...
          usleep(3000*1000);
      
          printf("      **** Done. %p\n",QThread::currentThread());fflush(stdout);
          working = false;
          emit done();
      }
      
    • A QueueConnection from realWork to realProcess

      Working::Working()
      {
          working = false;
          connect(this,SIGNAL(realWork()),this,SLOT(realProcess()),Qt::QueuedConnection);
      }
      
  2. On your task provider side (thread_fast)

    • A startWork() signal

      void TaskProv::orderWork()
      {
          emit startWork();
      }
      
    • A DirectConnection to the worker process slot

      QObject::connect(&taskProvider,SIGNAL(startWork()),&worker,SLOT(process()),Qt::DirectConnection);
      

Some notes:

  • The function Working::process() will be ran in the thread_fast (even it is a worker member function) but it just check for a flag so it should not impact the processing time

  • If you mind a potential extra task drop, you can protect the Worker's working flag with mutex for tighter management.

  • This is very similar to lpapp's "Queue a busy slot operation from the data mutator slot" except that the connection type need to be a correct combination of Direct and Queue.



回答8:

As a note for @kuba-ober 's answer - I had to update their compressEvent(...) handler to check that mce->sender() != nullptr before the call to m_compressedSignals.contains(...) otherwise my code would segfault. I'm not sure why this was happening, but I'm also not trying to compress all events, only a few in my system.

updated code looks like

// Added code:
if (mce->sender() == nullptr) {
  return Base::compressEvent(event, receiver, postedEvents);
}
// end added code
if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;