How to Compress Slot Calls When Using Queued Connection in Qt?
Asked Answered
J

8

26

After reading some articles like this about Qt Signal-Slot communications I still have a question concerning the queued connection.

If I have some threads sending signals all the time to each other and lets say one thread_slowis running a slow method in it's event loop and another thread_fast is running a fast one that sends multiple signals while the other thread is still running it's slow method.....when the slow method from thread_slow returns to the event loop, will it process all the signals that were sent before by thread_fastor just the last one (all the signals are the same type)?

If it will process all the signals, is it there a way to make the thread_slow only process the last one? (Considering "the last one" in a multithread application might be vague, let's consider the last signal before the thread asked for the last signal, for the sake of simplicity, so new ones being sent while the thread looks for the last might be lost).

(I am asking this because I have multiple threads receiving data from multiple threads, and I dont want them to process old data, just the last one that was sent)

I have run some tests, and it appears that Qt will process all the signals. I made one thread do:

while(true)
{
    QThread::msleep(500);
    emit testQueue(test);
    test++;
}

and a slot in another will do:

void test::testQueue(int test)
{
    test.store(private_test.load() + test);
    emit testText(QString("Test Queue: ") + QString::number(private_test.load()));
}

and the thread will run:

while(true)
{
    QThread::msleep(3000);
    QCoreApplication::processEvents();
    private_test.store(private_test.load() + 1000);
}

I am sending a signal from one thread to the other every 500 milliseconds, and the other thread sleeps for 3000 milliseconds (3 seconds) and then wakes up and increment an internal variable by 100. every time the slot is executed it emits a text with the value received + the internal variable. The result I am having is that every time QCoreApplication::processEvents(); is called, all signals are executed....(I edited this part because I found a bug in my previous code)

Jakoba answered 1/1, 2014 at 8:52 Comment(13)
It is really very simple to test. Just put a qDebug() into the relevant slot, and see how many times it is printed... Do it a few times, and draw the conclusion, or check the relevant code yourself if you do not trust the trial. Not to mention, you could always write a shared variable in the "slow" thread, and the "fast" would pick up the data from there.Hunch
Check my answer, I have brought proof concerning the eventloopOlomouc
@Kikohs: your link seems to be pointing to the wrong place of the source code. I am not sure we need two identical answers though now that you significantly made it similar in principles to the other. At least, it does not hurt, I guess. :)Hunch
I have just given a +1 to this question (even though the content was edited without referring to the hints given in my answer) because I think it may be food for public API consideration to have this feature. Perhaps, it would be rejected in Qt, but at least it is nice to have a thread about discussing it.Hunch
WOW I got a +1 from LaszloPapp?! Man someone had a good new year party! hahahaJakoba
@mFeinstein Related to the discussion under my answer: could you clarify the question a bit? That is, what exactly are you asking, what solution do you want?Jennet
My asked solution was pretty clear when I said "only process the last one"Jakoba
@mFeinstein With multiple threads, "the last one" alone (assuming you mean the latest one) is not well defined. Unless you want to lock the sending side, there's always a time window, where you have not yet actually processed the data, but new data has already been sent.Jennet
@Jennet good point, i will update the question with this information. :) thanks!Jakoba
@KubaOber, whats the problem? Each thread will process the last event in it's own event queue...Jakoba
I meant that hyde can't really say that there's anything not well defined. Event queues are accessed while a mutex is held, so everything is done synchronously and things are certainly well defined. It wouldn't work otherwise.Melanymelaphyre
I think he meant of what I meant of "last one"...not that Qt is not definedJakoba
@KubaOber Yeah, I meant what mFeinstein says above. Synchronization operations (when events are queued and dequeued) of a Qt event queue is indeed the most natural way to define points in time and what "last" means.Jennet
K
7

I am trying to form my comment into an answer. I agree with you about that the documentation is lacking this information, or at least it is not clear for me, and apparently for you either.

There would be two options to get more information:

1) Trial

Put a qDebug() or printf()/fprintf() statement into your slot in the "slow" thread and see what it prints out. Run this a few times and draw the conclusion.

2) Making sure

You would need to read the source code for this how the meta object compiler, aka. moc gets this through from the source file. This is a bit more involved investigation, but this could lead to certainity.

As far as I know, every signal emission posting a corresponding event. Then, the event will be queued for the separate thread within the thread class. Here you can find the relevant two source code files:

void QCoreApplication::postEvent(QObject *receiver, QEvent *event, int priority)

and

class QPostEventList : public QVector

There are two approaches with their trade-offs:

Queue a busy slot operation from the data mutator slot

The main advantage is that signals could not be lost during the busy operation. However, this could be inherently slower as it can potentially process a lot more operation than needed.

The idea is that the data is re-set for each event handled, but the real busy operation is queued for execution only once. It does not necessarily have to be the for the first event if there are more, but that is the simplest implementation.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
    connect(this, SIGNAL(queueBusyOperationSignal()), SLOT(busyOperation()));
    ...
}

void Foo::dataUpdateSlot(const QByteArray &data)
{
    m_data = data;

    if (busyOperationQueued);
        emit queueBusyOperationSignal();
        m_busyOperationQueued = true;
    }
}

void MyClass::busyOperationSlot()
{

    // Do the busy work with m_data here

    m_busyOperationQueued = false;    
}

Connect/Disconnect

The idea is to disconnect the slot from the corresponding signal when starting the processing. This will ensure that new signal emission would not be caught, and connect the slot to the signal again once the thread is free to process the next events.

This would have some idle time in the thread though between the connection and the next even handled, but at least this would be a simple way of implmeneting it. It may actually be even negligible a performance difference depending on more context not really provided here.

The main drawback is that this would lose the signals during the busy operation.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(busyOperationSlot(const QByteArray&)));
    ...
}

void MyClass::busyOperationSlot(const QByteArray &data)
{
    disconnect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), this, SLOT(dataUpdateSlot(const QByteArray&)));

    // Do the busy work with data here

    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
}

Future thoughts

I was thinking if there was a convenient API - e.g. a processEvents() alike method, but with an argument to process only the last event posted - for actually telling the event system explicitly to process the last one rather than circumventing the issue itself. It does appear to be such an API, however, it is private.

Perhaps, someone will submit a feature request to have something like that in public.

/*!
\internal
Returns \c true if \a event was compressed away (possibly deleted) and should not be added to the list.
*/
bool QCoreApplication::compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents)

The relevant source code can be found here.

It also seems to have an overriden version in QGuiApplication and QApplication.

As for completeness, there is also such a method like this:

void QCoreApplication::removePostedEvents(QObject * receiver, int eventType = 0) [static]

Removes all events of the given eventType that were posted using postEvent() for receiver.

The events are not dispatched, instead they are removed from the queue. You should never need to call this function. If you do call it, be aware that killing events may cause receiver to break one or more invariants.

If receiver is null, the events of eventType are removed for all objects. If eventType is 0, all the events are removed for receiver. You should never call this function with eventType of 0. If you do call it in this way, be aware that killing events may cause receiver to break one or more invariants.

But this is not quite what you would like to have here as per documentation.

Kitsch answered 1/1, 2014 at 11:19 Comment(17)
But is it possible to disconnect and reconnect dynamically at run time? get a list with all conections?Jakoba
@mFeinstein: #2756194Hunch
@mFeinstein: But also see my answer. There's no simple way of getting a list of connections.Melanymelaphyre
@KubaOber I just saw it, yes it appears that things are not that easy :/ I hope people at Qt sees this thread....Jakoba
@mFeinstein: several people did see. I am also a Qt developer myself if that matters, although I do not contribute a silly amount, especially lately. However, I did discuss it with a few other people. Anyway, it is not like a very common case that someone has ever worried much about, apparently.Hunch
@LaszloPapp good to know it...yes it's not something most programs will use, but I can see some people will benefit from this, specially people on the embedded side of things that has to communicate with electronics devices sending information all the time (my case :P)Jakoba
@mFeinstein: yes, I agree. That is also why I began the discussion with others, but I do not have much time at this point to get it further than that.Hunch
If you don't care about including Qt's internal headers, getting the list of connections is quite simple (you have access to requisite sync primitives), so if you need to do it, you can do it. Alas, if you think embedded requires connection list snooping, you're doing things wrong. Connection lists are a means of decoupling pieces of code. They are there to promote good design. You don't have access to them for a good reason. Every time you think you need such access, you're doing something wrong. That has been my experience many times, and all I do is embedded.Melanymelaphyre
@KubaOber I dont mean I need acess to the list of connections for embdded design, I mean I need to only process the last event for embedded design ;)...the list is just a way to get the last event in the queue, but if Qt had a way to flag that I just want the last one, always, I wouldnt need the list.Jakoba
for example, in my case I have lots of positional data being sent by the USB and everytime there is a new position, I emit a signal with this new position so other threads might use this data...I dont know when the other threads will want this data, so it might happen that there will be lots of outdated positional data in the queue, if I could just flag Qt to only run the event queue for the last signal of one connection, this will avoid me lots of trouble...Jakoba
@mFeinstein: My answer has the precisely solution you're a looking for, implemented using the mechanisms that Qt provides for it, and requires no changes to either the sender or the receiver object.Melanymelaphyre
I belive this modification is not a hard one to do for Qt, probably just change the code to make a stack instead of a queue when a thread asks for it.Jakoba
Where "precisely solution" means "Do not use it if you are not part of the Qt Project or otherwise you are completely on your own with the consequences" since it can be removed or significantly revamped at any point without any further notice. That is not a problem if you are part of the Qt Project since the author introducing such a change would help with finding a nice solution for the release since your and his software would be released together, no breakage can happen.Hunch
@KubaOber I really wanted to avoid using inner stuff from the Qt project...I am new to Qt and C++ so messing up with the inner workings of the API kinda frightens me :PJakoba
@mFeinstein: it is highly discouraged even for "pros".Hunch
@mFeinstein: There is no other sane way of doing it, unless you don't want to use the unadulterated signal-slot mechanism anymore. By "unadulterated" I mean that you don't modify the sender, the receiver and you don't concoct intermediary objects. It's a trivial matter to do it if you use an intermediary that compresses the calls, but such intermediary object needs coding done for each slot that is called. I'll post another answer showing how to use an intermediary object, but it's not a generic solution anymore. It might be a reasonable alternative, though.Melanymelaphyre
I think you didn't mean: if (busyOperationQueued); emit queueBusyOperationSignal(); You meant: if (!busyOperationQueued) { emit queueBusyOperationSignal();Inconsiderable
T
14

QCoreApplication QMetaCallEvent Compression

Every queued slot call ends up in the posting of a QMetaCallEvent to the target object. The event contains the sender object, the signal id, the slot index, and packaged call parameters. On Qt 5, the signal id generally doesn't equal the value returned by QMetaObject::signalIndex(): it is an index computed as if the object only had signal methods and no other methods.

The objective is to compress such calls so that only one unique call exists in the event queue for a given tuple of (sender object, sender signal, receiver object, receiver slot).

This is the only sane way to do it, without having to make changes to source or target objects, and while maintaining minimal overhead. The event-loop-recursing methods in my other answers have serious stack overhead per each event, on the order of 1kbyte when Qt is built for 64-bit-pointer architectures.

The event queue can be accessed when new events are posted to an object that has one or more events already posted to it. In such case, QCoreApplication::postEvent calls QCoreApplication::compressEvent. compressEvent is not called when the first event is posted to an object. In a reimplementation of this method, the contents of a QMetaCallEvent posted to the target object can be checked for a call to your slot, and the obsolete duplicate has to be deleted. Private Qt headers have to be included to obtain the definitions of QMetaCallEvent, QPostEvent and QPostEventList.

Pros: Neither the sender nor the receiver objects have to be aware of anything. Signals and slots work as-is, including the method-pointer calls in Qt 5. Qt itself uses this way of compressing events.

Cons: Requires inclusion of private Qt headers and forcible clearing of QEvent::posted flag.

Instead of hacking the QEvent::posted flag, the events to be deleted can be queued in a separate list and deleted outside of the compressEvent call, when a zero-duration timer is fired. This has the overhead of an extra list of events, and each event deletion iterating through the posted event list.

Other Approaches

The point of doing it some other way is not to use Qt's internals.

L1 The first limitation is not having access to the contents of privately defined QMetaCallEvent. It can be dealt with as follows:

  1. A proxy object with signals and slots of same signatures as those of the target can be connected between the source and target objects.

  2. Running the QMetaCallEvent on a proxy object allows extraction of the call type, the called slot id, and the arguments.

  3. In lieu of signal-slot connections, events can be explicitly posted to the target object. The target object, or an event filter, must explicitly re-synthesize the slot call from event's data.

  4. A custom compressedConnect implementation can be used in lieu of QObject::connect. This fully exposes the details of the signal and slot. A proxy object can be used to perform a compression-friendly equivalent of queued_activate on the side of the sender object.

L2 The second limitation is not being able to completely reimplement QCoreApplication::compressEvent, since the event list is defined privately. We still have access to the event being compressed, and we can still decide whether to delete it or not, but there's no way to iterate the event list. Thus:

  1. The event queue can be accessed implicitly by recursively calling sendPostedEvents from within notify (thus also from eventFilter(), event() or from the slots). This doesn't cause a deadlock, since QCoreApplication::sendPostedEvents can't (and doesn't) hold an event loop mutex while the event is delivered via sendEvent. Events can be filtered as follows:

    • globally in a reimplemented QCoreApplication::notify,
    • globally by registering a QInternal::EventNotifyCallback,
    • locally by attaching an event filter to the objects,
    • explicitly locally by reimplementing QObject::event() in the target class.

    The duplicate events are still posted to the event queue. The recursive calls to notify from within sendPostedEvents consume quite a bit of stack space (budget 1kb on 64-bit-pointer architectures).

  2. The events already present can be removed by calling QCoreApplication::removePostedEvents before posting a new event to an object. Unfortunately, doing this within QCoreApplication::compressEvent causes a deadlock as the event queue mutex is already held.

    A custom event class that includes the pointer to the receiver object can automatically call removePostedEvents in the constructor.

  3. Existing compressed events, such as QEvent::Exit, can be reappropriated.

    The set of those events is an implementation detail and could change. Qt doesn't discriminate among those events other than by the receiver QObject pointer. An implementation requires the overhead of a proxy QObject per each (event type, receiver object) tuple.

Implementation

The code below works on both Qt 4 and Qt 5. On the latter, make sure to add QT += core-private to your qmake project file, so that private Qt headers get included.

The implementations not using Qt internal headers are given in other answers:

There are two event-removing code paths, selected by if (true). The enabled code path retains the most recent event and makes most sense, typically. Alternatively, you could want to retain the oldest event - that's what the disabled code path does.

screenshot

#include <QApplication>
#include <QMap>
#include <QSet>
#include <QMetaMethod>
#include <QMetaObject>
#include <private/qcoreapplication_p.h>
#include <private/qthread_p.h>
#include <private/qobject_p.h>

#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

// Works on both Qt 4 and Qt 5.

//
// Common Code

/*! Keeps a list of singal indices for one or more meatobject classes.
 * The indices are signal indices as given by QMetaCallEvent.signalId.
 * On Qt 5, those do *not* match QMetaObject::methodIndex since they
 * exclude non-signal methods. */
class SignalList {
    Q_DISABLE_COPY(SignalList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
    /*! Returns a signal index that is can be compared to QMetaCallEvent.signalId. */
    static int signalIndex(const QMetaMethod & method) {
        Q_ASSERT(method.methodType() == QMetaMethod::Signal);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        int index = -1;
        const QMetaObject * mobj = method.enclosingMetaObject();
        for (int i = 0; i <= method.methodIndex(); ++i) {
            if (mobj->method(i).methodType() != QMetaMethod::Signal) continue;
            ++ index;
        }
        return index;
#else
        return method.methodIndex();
#endif
    }
public:
    SignalList() {}
    void add(const QMetaMethod & method) {
        m_data[method.enclosingMetaObject()].insert(signalIndex(method));
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(signalIndex(method));
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int signalId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(signalId);
    }
};

//
// Implementation Using Event Compression With Access to Private Qt Headers

struct EventHelper : private QEvent {
    static void clearPostedFlag(QEvent * ev) {
        (&static_cast<EventHelper*>(ev)->t)[1] &= ~0x8001; // Hack to clear QEvent::posted
    }
};

template <class Base> class CompressorApplication : public Base {
    SignalList m_compressedSignals;
public:
    CompressorApplication(int & argc, char ** argv) : Base(argc, argv) {}
    void addCompressedSignal(const QMetaMethod & method) { m_compressedSignals.add(method); }
    void removeCompressedSignal(const QMetaMethod & method) { m_compressedSignals.remove(method); }
protected:
    bool compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents) {
        if (event->type() != QEvent::MetaCall)
            return Base::compressEvent(event, receiver, postedEvents);

        QMetaCallEvent *mce = static_cast<QMetaCallEvent*>(event);
        if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
        for (QPostEventList::iterator it = postedEvents->begin(); it != postedEvents->end(); ++it) {
            QPostEvent &cur = *it;
            if (cur.receiver != receiver || cur.event == 0 || cur.event->type() != event->type())
                continue;
            QMetaCallEvent *cur_mce = static_cast<QMetaCallEvent*>(cur.event);
            if (cur_mce->sender() != mce->sender() || cur_mce->signalId() != mce->signalId() ||
                    cur_mce->id() != mce->id())
                continue;
            if (true) {
              /* Keep The Newest Call */              
              // We can't merely qSwap the existing posted event with the new one, since QEvent
              // keeps track of whether it has been posted. Deletion of a formerly posted event
              // takes the posted event list mutex and does a useless search of the posted event
              // list upon deletion. We thus clear the QEvent::posted flag before deletion.
              EventHelper::clearPostedFlag(cur.event);
              delete cur.event;
              cur.event = event;
            } else {
              /* Keep the Oldest Call */
              delete event;
            }
            return true;
        }
        return false;
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
        connect(invoke, &QPushButton::clicked, this, &Widget::sendSignals);
        connect(&m_signaller, &Signaller::emptySignal, this, &Widget::emptySlot, Qt::QueuedConnection);
        connect(&m_signaller, &Signaller::dataSignal, this, &Widget::dataSlot, Qt::QueuedConnection);
#else
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
#endif
    }
};

int main(int argc, char *argv[])
{
    CompressorApplication<QApplication> a(argc, argv);
#if QT_VERSION >= QT_VERSION_CHECK(5,0,0)
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::emptySignal));
    a.addCompressedSignal(QMetaMethod::fromSignal(&Signaller::dataSignal));
#else
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("emptySignal()")));
    a.addCompressedSignal(Signaller::staticMetaObject.method(Signaller::staticMetaObject.indexOfSignal("dataSignal(int)")));
#endif
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"
Tyra answered 1/1, 2014 at 8:52 Comment(4)
I see this can be a real solution just the way I wanted (i.e. working silently and the objects not even knowing its there), for only getting the last signal, but it's a shame it has to be made on Qt's inner workings...I hope they can provide this functionality in the future...Jakoba
> Every queued slot call. < May be signal?Accroach
@Accroach It's a generic term, really. You can queue calls of signals too, but what's queued is the receiving end, not the sending end. Thus queued slot call is more appropriate: the queued call has no connection to the sender, other than via the QObject::sender() mechanism.Melanymelaphyre
@KubaOber, after read some more articles and posts I would like to say signals generate and post events to queue, events received from queue and handled by corresponding slot. You are right.Accroach
K
7

I am trying to form my comment into an answer. I agree with you about that the documentation is lacking this information, or at least it is not clear for me, and apparently for you either.

There would be two options to get more information:

1) Trial

Put a qDebug() or printf()/fprintf() statement into your slot in the "slow" thread and see what it prints out. Run this a few times and draw the conclusion.

2) Making sure

You would need to read the source code for this how the meta object compiler, aka. moc gets this through from the source file. This is a bit more involved investigation, but this could lead to certainity.

As far as I know, every signal emission posting a corresponding event. Then, the event will be queued for the separate thread within the thread class. Here you can find the relevant two source code files:

void QCoreApplication::postEvent(QObject *receiver, QEvent *event, int priority)

and

class QPostEventList : public QVector

There are two approaches with their trade-offs:

Queue a busy slot operation from the data mutator slot

The main advantage is that signals could not be lost during the busy operation. However, this could be inherently slower as it can potentially process a lot more operation than needed.

The idea is that the data is re-set for each event handled, but the real busy operation is queued for execution only once. It does not necessarily have to be the for the first event if there are more, but that is the simplest implementation.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
    connect(this, SIGNAL(queueBusyOperationSignal()), SLOT(busyOperation()));
    ...
}

void Foo::dataUpdateSlot(const QByteArray &data)
{
    m_data = data;

    if (busyOperationQueued);
        emit queueBusyOperationSignal();
        m_busyOperationQueued = true;
    }
}

void MyClass::busyOperationSlot()
{

    // Do the busy work with m_data here

    m_busyOperationQueued = false;    
}

Connect/Disconnect

The idea is to disconnect the slot from the corresponding signal when starting the processing. This will ensure that new signal emission would not be caught, and connect the slot to the signal again once the thread is free to process the next events.

This would have some idle time in the thread though between the connection and the next even handled, but at least this would be a simple way of implmeneting it. It may actually be even negligible a performance difference depending on more context not really provided here.

The main drawback is that this would lose the signals during the busy operation.

Foo::Foo(QObject *parent) : QObject(parent)
{
    ...
    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(busyOperationSlot(const QByteArray&)));
    ...
}

void MyClass::busyOperationSlot(const QByteArray &data)
{
    disconnect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), this, SLOT(dataUpdateSlot(const QByteArray&)));

    // Do the busy work with data here

    connect(this, SIGNAL(dataUpdateSignal(const QByteArray&)), SLOT(dataUpdateSlot(const QByteArray&)));
}

Future thoughts

I was thinking if there was a convenient API - e.g. a processEvents() alike method, but with an argument to process only the last event posted - for actually telling the event system explicitly to process the last one rather than circumventing the issue itself. It does appear to be such an API, however, it is private.

Perhaps, someone will submit a feature request to have something like that in public.

/*!
\internal
Returns \c true if \a event was compressed away (possibly deleted) and should not be added to the list.
*/
bool QCoreApplication::compressEvent(QEvent *event, QObject *receiver, QPostEventList *postedEvents)

The relevant source code can be found here.

It also seems to have an overriden version in QGuiApplication and QApplication.

As for completeness, there is also such a method like this:

void QCoreApplication::removePostedEvents(QObject * receiver, int eventType = 0) [static]

Removes all events of the given eventType that were posted using postEvent() for receiver.

The events are not dispatched, instead they are removed from the queue. You should never need to call this function. If you do call it, be aware that killing events may cause receiver to break one or more invariants.

If receiver is null, the events of eventType are removed for all objects. If eventType is 0, all the events are removed for receiver. You should never call this function with eventType of 0. If you do call it in this way, be aware that killing events may cause receiver to break one or more invariants.

But this is not quite what you would like to have here as per documentation.

Kitsch answered 1/1, 2014 at 11:19 Comment(17)
But is it possible to disconnect and reconnect dynamically at run time? get a list with all conections?Jakoba
@mFeinstein: #2756194Hunch
@mFeinstein: But also see my answer. There's no simple way of getting a list of connections.Melanymelaphyre
@KubaOber I just saw it, yes it appears that things are not that easy :/ I hope people at Qt sees this thread....Jakoba
@mFeinstein: several people did see. I am also a Qt developer myself if that matters, although I do not contribute a silly amount, especially lately. However, I did discuss it with a few other people. Anyway, it is not like a very common case that someone has ever worried much about, apparently.Hunch
@LaszloPapp good to know it...yes it's not something most programs will use, but I can see some people will benefit from this, specially people on the embedded side of things that has to communicate with electronics devices sending information all the time (my case :P)Jakoba
@mFeinstein: yes, I agree. That is also why I began the discussion with others, but I do not have much time at this point to get it further than that.Hunch
If you don't care about including Qt's internal headers, getting the list of connections is quite simple (you have access to requisite sync primitives), so if you need to do it, you can do it. Alas, if you think embedded requires connection list snooping, you're doing things wrong. Connection lists are a means of decoupling pieces of code. They are there to promote good design. You don't have access to them for a good reason. Every time you think you need such access, you're doing something wrong. That has been my experience many times, and all I do is embedded.Melanymelaphyre
@KubaOber I dont mean I need acess to the list of connections for embdded design, I mean I need to only process the last event for embedded design ;)...the list is just a way to get the last event in the queue, but if Qt had a way to flag that I just want the last one, always, I wouldnt need the list.Jakoba
for example, in my case I have lots of positional data being sent by the USB and everytime there is a new position, I emit a signal with this new position so other threads might use this data...I dont know when the other threads will want this data, so it might happen that there will be lots of outdated positional data in the queue, if I could just flag Qt to only run the event queue for the last signal of one connection, this will avoid me lots of trouble...Jakoba
@mFeinstein: My answer has the precisely solution you're a looking for, implemented using the mechanisms that Qt provides for it, and requires no changes to either the sender or the receiver object.Melanymelaphyre
I belive this modification is not a hard one to do for Qt, probably just change the code to make a stack instead of a queue when a thread asks for it.Jakoba
Where "precisely solution" means "Do not use it if you are not part of the Qt Project or otherwise you are completely on your own with the consequences" since it can be removed or significantly revamped at any point without any further notice. That is not a problem if you are part of the Qt Project since the author introducing such a change would help with finding a nice solution for the release since your and his software would be released together, no breakage can happen.Hunch
@KubaOber I really wanted to avoid using inner stuff from the Qt project...I am new to Qt and C++ so messing up with the inner workings of the API kinda frightens me :PJakoba
@mFeinstein: it is highly discouraged even for "pros".Hunch
@mFeinstein: There is no other sane way of doing it, unless you don't want to use the unadulterated signal-slot mechanism anymore. By "unadulterated" I mean that you don't modify the sender, the receiver and you don't concoct intermediary objects. It's a trivial matter to do it if you use an intermediary that compresses the calls, but such intermediary object needs coding done for each slot that is called. I'll post another answer showing how to use an intermediary object, but it's not a generic solution anymore. It might be a reasonable alternative, though.Melanymelaphyre
I think you didn't mean: if (busyOperationQueued); emit queueBusyOperationSignal(); You meant: if (!busyOperationQueued) { emit queueBusyOperationSignal();Inconsiderable
G
4

This is another approach. It requires no changes to the sender nor receiver objects, but requires a custom CompressorProxy object. This is portable to both Qt 4 and Qt 5, and requires no access to Qt's internals.

The compressor object must be a child of the target object -- the one with the slots. That way it tracks the thread of the target object. Since the compressor's signals are attached to the target's slots, when they are in the same thread there is no overhead of queued connections for the target slot calls.

The magic happens in the emitCheck method: it calls itself recursively.

  1. A slot call ends up in emitCheck.
  2. Further posted events are sent by calling sendPostedEvents.
  3. If there are any duplicate slot calls in the event queue, they'll end up in emitCheck again.
  4. Once the last event in the queue is picked up, and sendPostedEvents doesn't recurse anymore, a flag is reset for the given slot, so that its proxy signal won't be emitted more than once. That's the desired compression behavior.

For any given set of queued slot calls to an instance of CompressorProxy, emitCheck will return true only exactly once for a slot that was called multiple times in a pass through the posted event list.

Note that the stack use per recursive call in release mode is around 600 bytes on 32 bit architectures, and twice that on 64 bit architectures. In debug mode on OS X, using a 64 bit build, the stack use per recursion is ~4kb.

screenshot

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>

class CompressorProxy : public QObject {
    Q_OBJECT
    bool emitCheck(bool & flag) {
        flag = true;
        QCoreApplication::sendPostedEvents(this, QEvent::MetaCall); // recurse
        bool result = flag;
        flag = false;
        return result;
    }

    bool m_slot;
    Q_SLOT void slot() {
        if (emitCheck(m_slot)) emit signal();
    }
    Q_SIGNAL void signal();

    bool m_slot_int;
    Q_SLOT void slot_int(int arg1) {
        if (emitCheck(m_slot_int)) emit signal_int(arg1);
    }
    Q_SIGNAL void signal_int(int);
public:
    // No default constructor, since the proxy must be a child of the
    // target object.
    explicit CompressorProxy(QObject * parent) : QObject(parent) {}
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        CompressorProxy * proxy = new CompressorProxy(this);
        connect(&m_signaller, SIGNAL(emptySignal()), proxy, SLOT(slot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), proxy, SLOT(slot_int(int)), Qt::QueuedConnection);
        connect(proxy, SIGNAL(signal()), this, SLOT(emptySlot()));
        connect(proxy, SIGNAL(signal_int(int)), this, SLOT(dataSlot(int)));
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    Widget w;
    w.show();
    return a.exec();
}

#include "main.moc"
Grunter answered 8/1, 2014 at 20:49 Comment(0)
G
3

This is a yet another approach portable to both Qt 4 and Qt 5, and requiring no access to Qt's internals (other than what's available via public headers). On Qt 5, only the Qt 4-style connections are supported. The compressed entities are (receiver object, slot) pairs. This is different than the (sender, receiver, signal, slot) tuple used when one has full access to QMetaCallEvent.

It utilizes the QObject::qt_metacall to snoop the call's details from the black box QMetaCallEvent. Recursion into sendPostedEvents is used, just like in my other no-internals answer.

It's worth noting that QObject::qt_metacall's API has remained unchanged since at least Qt 4.0.

screenshot

#include <QApplication>
#include <QWidget>
#include <QPushButton>
#include <QPlainTextEdit>
#include <QSpinBox>
#include <QFormLayout>
#include <QSet>
#include <QMetaMethod>

// Common Code

/*! Keeps a list of method indices for one or more meatobject classes. */
class MethodList {
    Q_DISABLE_COPY(MethodList)
    typedef QMap<const QMetaObject *, QSet<int> > T;
    T m_data;
public:
    MethodList() {}
    template <class T> void add(const char * slot) {
        add(T::staticMetaObject.method(T::staticMetaObject.indexOfSlot(slot)));
    }
    void add(const QMetaMethod & method) {
        Q_ASSERT(method.methodIndex() >= 0);
        m_data[method.enclosingMetaObject()].insert(method.methodIndex());
    }
    void remove(const QMetaMethod & method) {
        T::iterator it = m_data.find(method.enclosingMetaObject());
        if (it != m_data.end()) {
            it->remove(method.methodIndex());
            if (it->empty()) m_data.erase(it);
        }
    }
    bool contains(const QMetaObject * metaObject, int methodId) {
        T::const_iterator it = m_data.find(metaObject);
        return it != m_data.end() && it.value().contains(methodId);
    }
};
Q_GLOBAL_STATIC(MethodList, compressedSlots)

// Compressor

class Compressor : public QObject {
    enum { Idle, Armed, Valid } m_state;
    QMetaObject::Call m_call;
    int m_methodIndex;
    QSet<int> m_armed; // armed method IDs

    int qt_metacall(QMetaObject::Call call, int id, void ** args) {
        if (m_state != Armed) return QObject::qt_metacall(call, id, args);
        m_state = Valid;
        m_call = call;
        m_methodIndex = id;
        return 0;
    }
    bool eventFilter(QObject * target, QEvent * ev) {
        Q_ASSERT(target == parent());
        if (ev->type() == QEvent::MetaCall) {
            m_state = Armed;
            if (QT_VERSION < QT_VERSION_CHECK(5,0,0) || ! *(void**)(ev+1)) {
                // On Qt5, we ensure null QMetaCallEvent::slotObj_ since we can't handle Qt5-style member pointer calls
                Compressor::event(ev); // Use QObject::event() and qt_metacall to extract metacall data
            }
            if (m_state == Armed) m_state = Idle;
            // Only intercept compressed slot calls
            if (m_state != Valid || m_call != QMetaObject::InvokeMetaMethod ||
                    ! compressedSlots()->contains(target->metaObject(), m_methodIndex)) return false;
            int methodIndex = m_methodIndex;
            m_armed.insert(methodIndex);
            QCoreApplication::sendPostedEvents(target, QEvent::MetaCall); // recurse
            if (! m_armed.contains(methodIndex)) return true; // Compress the call
            m_armed.remove(methodIndex);
        }
        return false;
    }
public:
    Compressor(QObject * parent) : QObject(parent), m_state(Idle) {
        parent->installEventFilter(this);
    }
};

//
// Demo GUI

class Signaller : public QObject {
    Q_OBJECT
public:
    Q_SIGNAL void emptySignal();
    Q_SIGNAL void dataSignal(int);
};

class Widget : public QWidget {
    Q_OBJECT
    QPlainTextEdit * m_edit;
    QSpinBox * m_count;
    Signaller m_signaller;
    Q_SLOT void emptySlot() {
        m_edit->appendPlainText("emptySlot invoked");
    }
    Q_SLOT void dataSlot(int n) {
        m_edit->appendPlainText(QString("dataSlot(%1) invoked").arg(n));
    }
    Q_SLOT void sendSignals() {
        m_edit->appendPlainText(QString("\nEmitting %1 signals").arg(m_count->value()));
        for (int i = 0; i < m_count->value(); ++ i) {
            emit m_signaller.emptySignal();
            emit m_signaller.dataSignal(i + 1);
        }
    }
public:
    Widget(QWidget * parent = 0) : QWidget(parent),
        m_edit(new QPlainTextEdit), m_count(new QSpinBox)
    {
        QFormLayout * l = new QFormLayout(this);
        QPushButton * invoke = new QPushButton("Invoke");
        m_edit->setReadOnly(true);
        m_count->setRange(1, 1000);
        l->addRow("Number of slot invocations", m_count);
        l->addRow(invoke);
        l->addRow(m_edit);
        connect(invoke, SIGNAL(clicked()), SLOT(sendSignals()));
        m_edit->appendPlainText(QString("Qt %1").arg(qVersion()));
        connect(&m_signaller, SIGNAL(emptySignal()), SLOT(emptySlot()), Qt::QueuedConnection);
        connect(&m_signaller, SIGNAL(dataSignal(int)), SLOT(dataSlot(int)), Qt::QueuedConnection);
    }
};

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    compressedSlots()->add<Widget>("emptySlot()");
    compressedSlots()->add<Widget>("dataSlot(int)");
    Widget w;
    new Compressor(&w);
    w.show();
    return a.exec();
}

#include "main.moc"
Grunter answered 10/1, 2014 at 1:26 Comment(2)
Haha, meatobject :)Inconsiderable
@TimAngus That comment is now frozen. It shall outlast the code if need be.Melanymelaphyre
O
2
thread_slow 

will process all the signals sent in its event loop, if you used queue connection or postEvent

Source:

Queued Connection The slot is invoked when control returns to the event loop of the receiver's thread. The slot is executed in the receiver's thread.

QtDoc

If you want more details on how the event are processed you can look here:

https://qt.gitorious.org/qt/qtbase/source/631c3dbc800bb9b2e3b227c0a09523f0f7eef0b7:src/corelib/thread/qthread_p.h#L127

As you can see, event are sorted in priority order, so if all your events have the same priority, it is first in first out.

It is not a trivial task, here an rough attempt, tell me if it works.

What I suggest is to basically store the events yourself and to process only the last one.

thread_slow.h

int current_val;
bool m_isRunning;

thread_slow.cpp

void enqueue_slot( int val /*or whatever you value is*/ ) {
     // You'll enventually need a a QMutex here if your slot is not call in the thread
     m_current_val = val;
     if( !m_isRunning )
         slowRun();
}

void checkHasReceivedEventSlot() {
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}

void slowRun() {
    m_isRunning = true;
    int v = m_current_val;
    m_current_val = -1; // Invalid value

   // Do stuff with v

   // Let the queue fill itself with enqueue_slot calls
   QTimer::singleShot(kTIMEOUT, this, SLOT(checkHasReceivedEventSlot()));
}

The first time enqueue_slot is called, slow run will start

EDIT:

To ensure that it is the last event, you could maybe do the following:

void checkHasReceivedEventSlot() {
    // Runs enqueue_slot until no more events are in the loop
    while( m_thread->eventDispatcher()->hasPendingEvents() )
         m_thread->eventDispatcher()->processEvents(QEventLoop::AllEvents);

    // m_current_val should hold the last event
    if( m_current_val != -1 ) // Invalid value or a test condition
        slowRun();
}
Olomouc answered 1/1, 2014 at 10:40 Comment(18)
"will process all the signals sent in its event loop." -> Could you please incorporate some proof for that in your answer because if it is not true, the rest is irrelevant, so you first need to prove that one up front. :)Hunch
Furthermore, you should use .isEmpty() in a Qt program rather than not .empty(). Your code is also lacking important context, like 'v' is never used. It is not clear how checkHasRecivedEventSlot() would be called the first time, or slowRun() for that matter... etc.Hunch
Dude, I know you like to downvote, it is not the first time. I said it was an attempt. I have fixed the code.Olomouc
For the first question, it is required by Qt, test it yourself. You can actually blow the event loop stack if you queue to many events.Olomouc
"Test it yourself" is not a proof. Either a documentation or source code would be a proof. What if I claim tentatively I have a different experience, who will be right.... It is a guess at this point from your side without backing it up. Actually, you do not even include a simple example to back that up.Hunch
Why a stack if I am only using the last value sent? current_val = new_val might be sufficient, saving memory and processing time, right?Jakoba
@mFeinstein: correct, that is what I also wrote in the comment. IMHO, a QStack is an overkill for this.Hunch
Also I am not 100% secure this will work, my singleShot() execution will be slow, and m_isRunning = false; might take a while to be set generating lots of stuff in the queue, so this way when enqueue_slot() is called, slowRun() will be called with the first value in the queue, not the last oneJakoba
It is better, thanks. Unfortunately, I still think QStack is a bit heavy approach.Hunch
Yes, I removed it, with a local current_valOlomouc
I don't get the "you replicated the answer" thing. I was the first to give proof with doc and code (which I had to dig, because it is an internal header). I just took the comment on the QStack and changed it to a local variable because QStack is overkill.Olomouc
Your proof is pointing to the wrong part of the source code. More importantly, the main problem was not the proof, but the fundamental QStack principle, and you did not wish to modify it based on your above comment. Hence, I posted a better answer without QStack. Now that I did, you turned your answer into that idea.Hunch
I only read "Why a stack if I am only using the last value sent? current_val = new_val might be sufficient, saving memory and processing time, right? – mFeinstein 1 hour ago". But you may be right for the QStack fine, I have just looked at your answer. The all idea is more on the different slots and the QTimer than on the QStack though.Olomouc
@Kikohs I am still not sure your code works...what assures me I have the last one and I am not still looping the events queue?Jakoba
Updated answer, it is not tested though.Olomouc
@Kikohs: unfortunately, this code will not catch the last event.Hunch
@Kikohs: please explain how this addresses the OP's concern. If you think that now it does not, I would suggest the deletion, but perhaps I got something wrong, and you can shed some light here.Hunch
I removed my -1 because the answer speaks for itself that is wrong.Hunch
J
2

From question: "If it will process all the signals, is it there a way to make the thread_slow only process the last one?"

If you just want to always get the last signal processed, and don't mind if few extra signals get processed as long as it does not make things slow, then you could try a very simple approach like this, using the regular QThread::exec() event loop. Put these slot methods into a QObject subclass, which you then move to a thread:

//slot
void MyClass::publicReceiverSlotForQueuedSignals(QString data)
{
    // Update data every time
    mReceivedData = data;

    // Allow worker method to be queued just once
    if (!mWorkerSlotInvoked) {
        mWorkerSlotInvoked = true;
        QMetaObject::invokeMethod(this, "workerSlot", Qt::QueuedConnection);
        qDebug() << "publicReceiverSlotForQueuedSignals: invoked workerSlot!"
                 << "New data:" << mReceivedData;
    } else {
        qDebug() << "publicReceiverSlotForQueuedSignals: workerSlot already invoked."
                 << "New data:" << mReceivedData;
    }
}

//slot
void MyClass::privateWorkerSlot()
{
    mWorkerSlotInvoked = false;
    qDebug() << "workerSlot for data:" << mReceivedData;
    QThread::msleep(3000);
    qDebug() << "workerSlot returning.";
}

The publicReceiverSlotForQueuedSignals goes through very fast (qDebug in else is probably the most time consuming part for rapid calls), so it doesn't really matter how many signals are queued. And then privateWorkerSlot will get invoked just one per event loop rotation of that thread, no matter how slowly it goes.

Also it would be trivial to add a mutex to protect mReceivedData and mWorkerSlotInvoked in both slot methods (and everywhere else you might use them). Then you could make a direct connection to the slot, because invokeMethod is thread safe, and mutex would make handling the private data members of MyClass thread safe as well. Just make sure you copy the contents of mReceivedData to a local variable and unlock the mutex, before doing the time consuming processing of it.

Note: untested code, probably has a few mistakes.

Jennet answered 1/1, 2014 at 12:51 Comment(16)
I want the LAST signal not the FIRST, so this way I have the most update data, not the old one to processJakoba
@mFeinstein This will give the LAST signal... Or at least almost last, there is a window where other thread may queue signals after privateWorkerSlot gets invoked, but before it is actually called. But since the public slot is processed very fast, and assuming there are no other time-consuming operations, the window is very short. And adding the mutex will solve even that.Jennet
@hyde: I do not understand how that would be happening based on your code. You basically stop the invokation after the first one guaranteed by the guard and the timer. Also, "quasi fast" is not enough for a robust operation. :) And even if you add thread synchronization primitives, your code would not ensure only the last posted event is processed because for that you need to deal with the event dispatcher IMO. That is the only reliable source for it.Hunch
@Jennet I didnt downvote it since I rather to ask you first, because maybe I didnt understood the code...downvoting is LaszloPapp hobby not mine :P (see Kikohs comment)Jakoba
@mFeinstein No problem them :)Jennet
@hyde: it was me, and I gave the explanation. I will remove it if you can convince me about the opposite, so do not worry about unrightful downvotes. I also got one downvote, but without any explanation. I hope you see the difference. :)Hunch
@LaszloPapp The sleep is just from the question code, it simulates whatever long operation might be happening. Also, the flag stops duplicate invocations, which is conceptually different from just "stop the invocation". It is 100% robust, the last emitted signal will always get processed, and every signal emitted before the invokeMethod call (which uses mutex internally I believe) will be collected into one privateWorkerSlot call. Other thread is able to queue more signals after the invocation but before the call, which will then cause another privateWorkerSlot call later.Jennet
Imagine the scenario when the invokeMethod is called. Then, you basically start processing at some point, but there is no guarantee publicReceiverSlotForQueuedSignals will not be called more times after privateWorkerSlot, which means the data will be obsolete and that is what the OP wanted to avoid.Hunch
@LaszloPapp Yes, for some definition of "obsolete", but this is unavoidable, because other thread may also emit signal and send updated data while privateWorkerSlot has just started executing, making the data obsolete while it's being processed. So if privateWorkerSlot execution takes 3 seconds, and processing all queued publicReceiverSlotForQueuedSignals executions take in the order of a millisecond just before that, then the concern of obsolete data received during that preceding millisecond seems irrelevant.Jennet
@hyde: There is a difference between getting the latest data and working with that data. They could be two different steps IMO. As for the former, you do not have such a problem if you use the event loop properly, otherwise the event loop would blow up with too many events anyhow. :} I think the main problem with this approach is the lack of loose coupling between obtaining the data and the operation with it.Hunch
@LaszloPapp The problem in the question is, that when the processing thread tries to process every signal fully, the signals keep getting queued faster than they can be processed. Given the times 500 ms and 3000 ms in the question, this solves the problem with minimal code, miniscule lag and very few opportunities for threading related bugs.Jennet
@hyde: Getting the data should take less than 500 ms IMO, since it is just stepping through the list (only a few events, or at least up to the limitation of the event queue) in a simple case like the OP has. Once you finished the processing of the dispatcher to obtain the data, then you can run the operation on the data which will be the heavy one. This way, you would not need to worry about getting obsolete data. With your code, this seems to be possible since those steps are closely coupled. If there are events with hefty procession, they could probably be processed first.Hunch
@Jennet do you mind explaining your code a little more? I am not sure I got it...as far as I can see the method will be invoked with the first signal data, not the last...while it process data there will be queue, when it leaves publicReceiverSlotForQueuedSignals() will get the first one in the queue and see that privateWorkerSlot() is not running and it will invoke it with this data...wont it?Jakoba
@mFeinstein The answer may not quite work for your exact use case after all (I have that 3 second "work" in a slot, you have it in your own event loop). But to explain: invocation will be queued at first slot call, but not called yet. So every signal that was emitted and queued before should be called (and update the data) before it.Jennet
@Hyde, Oh I think I got it...you will insert the call to the "worker" slot in the end of the queue, so when the signals arrive they will all update the new data and by some time the last guy in the queue will be the worker slot, right? Neat...Jakoba
@mFeinstein Yep. Benefit of this approach is, it needs just one extra slot and one extra boolean. Downsides are, more overhead compared to simple synchronized custom queue, that above mentioned small extra window for lag, and generally inability to customize how things work.Jennet
E
1

You can use the combination of DirectConnection and QueueConnection:

  1. On your Worker side (thread_slow):

    • A public slot, that is intended to be called by your task provider (thread_fast)

      void Working::process()
      {
         if (working)
         {
           printf("Drop a task %p\n", QThread::currentThread()); 
           return;
         }
      
        working = true;        
        emit realWork();
      }
      
    • A processing function (that is slow) : realProcess()

      void Working::realProcess()
      {
          printf("      **** Working ... %p\n",QThread::currentThread()); fflush(stdout);
      
          // Emulate a big processing ...
          usleep(3000*1000);
      
          printf("      **** Done. %p\n",QThread::currentThread());fflush(stdout);
          working = false;
          emit done();
      }
      
    • A QueueConnection from realWork to realProcess

      Working::Working()
      {
          working = false;
          connect(this,SIGNAL(realWork()),this,SLOT(realProcess()),Qt::QueuedConnection);
      }
      
  2. On your task provider side (thread_fast)

    • A startWork() signal

      void TaskProv::orderWork()
      {
          emit startWork();
      }
      
    • A DirectConnection to the worker process slot

      QObject::connect(&taskProvider,SIGNAL(startWork()),&worker,SLOT(process()),Qt::DirectConnection);
      

Some notes:

  • The function Working::process() will be ran in the thread_fast (even it is a worker member function) but it just check for a flag so it should not impact the processing time

  • If you mind a potential extra task drop, you can protect the Worker's working flag with mutex for tighter management.

  • This is very similar to lpapp's "Queue a busy slot operation from the data mutator slot" except that the connection type need to be a correct combination of Direct and Queue.

Ebullience answered 24/3, 2015 at 21:40 Comment(0)
I
0

As a note for @kuba-ober 's answer - I had to update their compressEvent(...) handler to check that mce->sender() != nullptr before the call to m_compressedSignals.contains(...) otherwise my code would segfault. I'm not sure why this was happening, but I'm also not trying to compress all events, only a few in my system.

updated code looks like

// Added code:
if (mce->sender() == nullptr) {
  return Base::compressEvent(event, receiver, postedEvents);
}
// end added code
if (! m_compressedSignals.contains(mce->sender()->metaObject(), mce->signalId())) return false;
Indubitability answered 31/12, 2018 at 15:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.