How to use ZeroMQ in an GTK/QT/Clutter application?
Asked Answered
S

5

7

In gtk applications all execution is taking place inside the gtk_main function. And other graphical frame works have similar event loops like app.exec for QT and clutter_main for Clutter. However ZeroMQ is based on the assumption that there is an while (1) ... loop that it is inserted into (see for instance here for examples).

How do you combine those two execution strategies?

I am currently wanting to use zeromq in a clutter application written in C, so I would of course like direct answers to that, but please add answers for other variants as well.

Scrapple answered 23/6, 2011 at 9:45 Comment(0)
I
6

It sounds like the ZeroMQ code wants simply to be executed over and over again as often as possible. The simplest way is to put the ZeroMQ code into an idle function or timeout function, and use non-blocking versions of the functions if they exist.

For Clutter, you would use clutter_threads_add_idle() or clutter_threads_add_timeout(). For GTK, you would use g_idle_add() or g_timeout_add().

The more difficult, but possibly better, way is to create a separate thread for the ZeroMQ code using g_thread_create(), and just use the while(1) construction with blocking functions as they suggest. If you do that, you will also have to find some way for the threads to communicate with each other - GLib's mutexes and async queues usually do fine.

Inaudible answered 23/6, 2011 at 11:53 Comment(2)
Wouldn't polling zeromq in an idle timer result in 100% CPU usage?Skiascope
In an idle function, probably yes, at least when the GTK main loop wasn't doing anything else. In a timeout function, no.Inaudible
I
14

The proper way to combine zmq and gtk or clutter is to connect the file-descriptor of the zmq queue to the main event loop. The fd can be retrieved by using

int fd;
size_t sizeof_fd = sizeof(fd);
if(zmq_getsockopt(socket, ZMQ_FD, &fd, &sizeof_fd))
      perror("retrieving zmq fd");

Connecting it to the main loop is the matter of using io_add_watch:

GIOChannel* channel = g_io_channel_unix_new(fd);    
g_io_add_watch(channel, G_IO_IN|G_IO_ERR|G_IO_HUP, callback_func, NULL);

In the callback function, it is necessary to first check if there is really stuff to read, before reading. Otherwise, the function might block waiting for IO.

gboolean callback_func(GIOChannel *source, GIOCondition condition,gpointer data)
{
    uint32_t status;
    size_t sizeof_status = sizeof(status);   

    while (1){
         if (zmq_getsockopt(socket, ZMQ_EVENTS, &status, &sizeof_status)) {
             perror("retrieving event status");
             return 0; // this just removes the callback, but probably
                       // different error handling should be implemented
         }
         if (status & ZMQ_POLLIN == 0) {
             break;
         }

         // retrieve one message here
    }
    return 1; // keep the callback active
}

Please note: this is not actually tested, I did a translation from Python+Clutter, which is what I use, but I'm pretty sure that it'll work. For reference, below is full Python+Clutter code which actually works.

import sys
from gi.repository import Clutter, GObject
import zmq

def Stage():
    "A Stage with a red spinning rectangle"
    stage = Clutter.Stage()

    stage.set_size(400, 400)
    rect = Clutter.Rectangle()
    color = Clutter.Color()
    color.from_string('red')
    rect.set_color(color)
    rect.set_size(100, 100)
    rect.set_position(150, 150)

    timeline = Clutter.Timeline.new(3000)
    timeline.set_loop(True)

    alpha = Clutter.Alpha.new_full(timeline, Clutter.AnimationMode.EASE_IN_OUT_SINE)
    rotate_behaviour = Clutter.BehaviourRotate.new(
        alpha, 
        Clutter.RotateAxis.Z_AXIS,
        Clutter.RotateDirection.CW,
        0.0, 359.0)
    rotate_behaviour.apply(rect)
    timeline.start()
    stage.add_actor(rect)

    stage.show_all()
    stage.connect('destroy', lambda stage: Clutter.main_quit())
    return stage, rotate_behaviour

def Socket(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.setsockopt(zmq.SUBSCRIBE, "")
    sock.connect(address)
    return sock

def zmq_callback(queue, condition, sock):
    print 'zmq_callback', queue, condition, sock

    while sock.getsockopt(zmq.EVENTS) & zmq.POLLIN:
        observed = sock.recv()
        print observed

    return True

def main():
    res, args = Clutter.init(sys.argv)
    if res != Clutter.InitError.SUCCESS:
        return 1

    stage, rotate_behaviour = Stage()

    sock = Socket(sys.argv[2])
    zmq_fd = sock.getsockopt(zmq.FD)
    GObject.io_add_watch(zmq_fd,
                         GObject.IO_IN|GObject.IO_ERR|GObject.IO_HUP,
                         zmq_callback, sock)

    return Clutter.main()

if __name__ == '__main__':
    sys.exit(main())
Ithaca answered 2/6, 2012 at 10:17 Comment(2)
Note that is important for the io_add_watch callback must return True. Without this, the callback will only be called once.Eliza
This is a pretty old thread, but it's the top result when you search gtk and zeromq and this is the best answer. I wanted to confirm that your code for combining zmq and gtk does indeed work except that it should be(status & ZMQ_POLLIN) == 0 instead of status & ZMQ_POLLIN == 0. With that change, it works perfectly.Braise
I
6

It sounds like the ZeroMQ code wants simply to be executed over and over again as often as possible. The simplest way is to put the ZeroMQ code into an idle function or timeout function, and use non-blocking versions of the functions if they exist.

For Clutter, you would use clutter_threads_add_idle() or clutter_threads_add_timeout(). For GTK, you would use g_idle_add() or g_timeout_add().

The more difficult, but possibly better, way is to create a separate thread for the ZeroMQ code using g_thread_create(), and just use the while(1) construction with blocking functions as they suggest. If you do that, you will also have to find some way for the threads to communicate with each other - GLib's mutexes and async queues usually do fine.

Inaudible answered 23/6, 2011 at 11:53 Comment(2)
Wouldn't polling zeromq in an idle timer result in 100% CPU usage?Skiascope
In an idle function, probably yes, at least when the GTK main loop wasn't doing anything else. In a timeout function, no.Inaudible
S
2

I found that there is a QT integration library called Zeromqt. Looking at the source, the core of the integration is the following:

ZmqSocket::ZmqSocket(int type, QObject *parent) : QObject(parent)
{
    ...
    notifier_ = new QSocketNotifier(fd, QSocketNotifier::Read, this);
    connect(notifier_, SIGNAL(activated(int)), this, SLOT(activity()));
}

...

void ZmqSocket::activity()
{
    uint32_t flags;
    size_t size = sizeof(flags);
    if(!getOpt(ZMQ_EVENTS, &flags, &size)) {
        qWarning("Error reading ZMQ_EVENTS in ZMQSocket::activity");
        return;
    }
    if(flags & ZMQ_POLLIN) {
        emit readyRead();
    }
    if(flags & ZMQ_POLLOUT) {
        emit readyWrite();
    }
    ...
}

Hence, it is relying on QT's integrated socket handling and Clutter will not have something similar.

Scrapple answered 23/6, 2011 at 12:48 Comment(1)
You might also want to have a look at nzmqt -- another Qt binding for ZeroMQ. There you'll find a poll-based implementation. Especially have a look at the class PollingZMQSocket (line 429++). Maybe you can do someting similar for Clutter.Hindi
E
2

You can get a file descriptor for 0MQ socket (ZMQ_FD option) and integrate that with your event loop. I presume gtk has some mechanism for handling sockets.

Eboh answered 27/7, 2011 at 6:30 Comment(0)
L
1

This an example in Python, using the PyQt4. It's derived from a working application.

import zmq
from PyQt4 import QtCore, QtGui

class QZmqSocketNotifier( QtCore.QSocketNotifier ):
    """ Provides Qt event notifier for ZMQ socket events """
    def __init__( self, zmq_sock, event_type, parent=None ):
        """
        Parameters:
        ----------
        zmq_sock : zmq.Socket
            The ZMQ socket to listen on. Must already be connected or bound to a socket address.
        event_type : QtSocketNotifier.Type
            Event type to listen for, as described in documentation for QtSocketNotifier.
        """
        super( QZmqSocketNotifier, self ).__init__( zmq_sock.getsockopt(zmq.FD), event_type, parent )

class Server(QtGui.QFrame):

def __init__(self, topics, port, mainwindow, parent=None):
    super(Server, self).__init__(parent)

    self._PORT = port

    # Create notifier to handle ZMQ socket events coming from client
    self._zmq_context = zmq.Context()
    self._zmq_sock = self._zmq_context.socket( zmq.SUB )
    self._zmq_sock.bind( "tcp://*:" + self._PORT )
    for topic in topics:
        self._zmq_sock.setsockopt( zmq.SUBSCRIBE, topic )
    self._zmq_notifier = QZmqSocketNotifier( self._zmq_sock, QtCore.QSocketNotifier.Read )

    # connect signals and slots
    self._zmq_notifier.activated.connect( self._onZmqMsgRecv )
    mainwindow.quit.connect( self._onQuit )

@QtCore.pyqtSlot()
def _onZmqMsgRecv():
    self._test_info_notifier.setEnabled(False)
    # Verify that there's data in the stream
    sock_status = self._zmq_sock.getsockopt( zmq.EVENTS )
    if sock_status == zmq.POLLIN:
        msg = self._zmq_sock.recv_multipart()
        topic = msg[0]
        callback = self._topic_map[ topic ]
        callback( msg )
    self._zmq_notifier.setEnabled(True)
    self._zmq_sock.getsockopt(zmq.EVENTS)

def _onQuit(self):
    self._zmq_notifier.activated.disconnect( self._onZmqMsgRecv )
    self._zmq_notifier.setEnabled(False)
    del self._zmq_notifier
    self._zmq_context.destroy(0)

Disabling and then re-enabling the notifier in _on_ZmqMsgRecv is per the documentation for QSocketNotifier.

The final call to getsockopt is for some reason necessary. Otherwise, the notifier stops working after the first event. I was actually going to post a new question for this. Does anyone know why this is needed?

Note that if you don't destroy the notifier before the ZMQ context, you'll probably get an error like this when you quit the application:

QSocketNotifier: Invalid socket 16 and type 'Read', disabling...
Lapel answered 17/8, 2015 at 13:18 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.