在GTK/QT/Clutter应用程序中如何使用ZeroMQ?

7
gtk应用程序中,所有执行都发生在gtk_main函数内部。其他图形框架也有类似的事件循环,如QTapp.execClutterclutter_main。然而,ZeroMQ基于这样一个假设:存在一个while (1) ...循环,它被插入其中(例如,参见此处的示例)。
如何结合这两种执行策略?
我目前想在C语言编写的Clutter应用程序中使用zeromq,因此当然希望得到直接的答案,但也请为其他变体添加答案。
5个回答

14

将zmq和gtk或clutter结合的正确方法是将zmq队列的文件描述符连接到主事件循环。可以使用以下方法检索fd:

int fd;
size_t sizeof_fd = sizeof(fd);
if(zmq_getsockopt(socket, ZMQ_FD, &fd, &sizeof_fd))
      perror("retrieving zmq fd");

将其连接到主循环是使用io_add_watch的问题:

GIOChannel* channel = g_io_channel_unix_new(fd);    
g_io_add_watch(channel, G_IO_IN|G_IO_ERR|G_IO_HUP, callback_func, NULL);
在回调函数中,在读取数据之前,有必要先检查是否真的有可供读取的内容。否则,该函数可能会阻塞等待IO操作完成。
gboolean callback_func(GIOChannel *source, GIOCondition condition,gpointer data)
{
    uint32_t status;
    size_t sizeof_status = sizeof(status);   

    while (1){
         if (zmq_getsockopt(socket, ZMQ_EVENTS, &status, &sizeof_status)) {
             perror("retrieving event status");
             return 0; // this just removes the callback, but probably
                       // different error handling should be implemented
         }
         if (status & ZMQ_POLLIN == 0) {
             break;
         }

         // retrieve one message here
    }
    return 1; // keep the callback active
}

请注意:这段代码并没有经过测试,我只是从我使用的Python + Clutter中进行了翻译,但我非常确定它会起作用。 以下是完整的Python + Clutter代码,实际上可以正常工作。

import sys
from gi.repository import Clutter, GObject
import zmq

def Stage():
    "A Stage with a red spinning rectangle"
    stage = Clutter.Stage()

    stage.set_size(400, 400)
    rect = Clutter.Rectangle()
    color = Clutter.Color()
    color.from_string('red')
    rect.set_color(color)
    rect.set_size(100, 100)
    rect.set_position(150, 150)

    timeline = Clutter.Timeline.new(3000)
    timeline.set_loop(True)

    alpha = Clutter.Alpha.new_full(timeline, Clutter.AnimationMode.EASE_IN_OUT_SINE)
    rotate_behaviour = Clutter.BehaviourRotate.new(
        alpha, 
        Clutter.RotateAxis.Z_AXIS,
        Clutter.RotateDirection.CW,
        0.0, 359.0)
    rotate_behaviour.apply(rect)
    timeline.start()
    stage.add_actor(rect)

    stage.show_all()
    stage.connect('destroy', lambda stage: Clutter.main_quit())
    return stage, rotate_behaviour

def Socket(address):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.SUB)
    sock.setsockopt(zmq.SUBSCRIBE, "")
    sock.connect(address)
    return sock

def zmq_callback(queue, condition, sock):
    print 'zmq_callback', queue, condition, sock

    while sock.getsockopt(zmq.EVENTS) & zmq.POLLIN:
        observed = sock.recv()
        print observed

    return True

def main():
    res, args = Clutter.init(sys.argv)
    if res != Clutter.InitError.SUCCESS:
        return 1

    stage, rotate_behaviour = Stage()

    sock = Socket(sys.argv[2])
    zmq_fd = sock.getsockopt(zmq.FD)
    GObject.io_add_watch(zmq_fd,
                         GObject.IO_IN|GObject.IO_ERR|GObject.IO_HUP,
                         zmq_callback, sock)

    return Clutter.main()

if __name__ == '__main__':
    sys.exit(main())

1
请注意,io_add_watch回调函数必须返回True。如果没有这个,回调函数只会被调用一次。 - Gary van der Merwe
这是一个相当古老的帖子,但当你搜索gtk和zeromq时,它是最顶部的结果,并且这是最好的答案。我想确认您结合zmq和gtk的代码确实有效,除了应该是(status & ZMQ_POLLIN) == 0而不是status & ZMQ_POLLIN == 0。有了这个更改,它可以完美地工作。 - Sophie

6
似乎ZeroMQ代码只想尽可能多地被执行。最简单的方法是将ZeroMQ代码放入空闲函数或超时函数中,并在可能的情况下使用非阻塞版本的函数。
对于Clutter,您可以使用clutter_threads_add_idle()clutter_threads_add_timeout()。对于GTK,您可以使用g_idle_add()g_timeout_add()
更加困难但可能更好的方法是使用g_thread_create()为ZeroMQ代码创建单独的线程,并像他们建议的那样使用while(1)结构和阻塞函数。如果你这样做,还需要找到一些方法让线程彼此通信-GLib的互斥锁和异步队列通常很好用。

2
在空闲计时器中轮询 zeromq 不会导致 CPU 使用率达到 100% 吗? - Nick Zalutskiy
2
在一个空闲函数中,可能是的,至少当GTK主循环没有做其他事情时。但在超时函数中,不行。 - ptomato

2

我发现有一个名为Zeromqt的QT集成库。查看源码,该集成库的核心如下所示:

ZmqSocket::ZmqSocket(int type, QObject *parent) : QObject(parent)
{
    ...
    notifier_ = new QSocketNotifier(fd, QSocketNotifier::Read, this);
    connect(notifier_, SIGNAL(activated(int)), this, SLOT(activity()));
}

...

void ZmqSocket::activity()
{
    uint32_t flags;
    size_t size = sizeof(flags);
    if(!getOpt(ZMQ_EVENTS, &flags, &size)) {
        qWarning("Error reading ZMQ_EVENTS in ZMQSocket::activity");
        return;
    }
    if(flags & ZMQ_POLLIN) {
        emit readyRead();
    }
    if(flags & ZMQ_POLLOUT) {
        emit readyWrite();
    }
    ...
}

因此,它依赖于QT集成的套接字处理,而Clutter没有类似的功能。

1
你可能还想看一下 nzmqt -- 另一个 ZeroMQ 的 Qt 绑定。其中包括基于轮询的实现。特别是要看一下类 PollingZMQSocket (line 429++)。也许你可以为 Clutter 做类似的事情。 - Jonny Dee

2
您可以通过ZMQ_FD选项获取0MQ套接字的文件描述符,并将其与您的事件循环集成。我假设gtk有一些处理套接字的机制。

1
这是一个使用PyQt4的Python示例,它派生自一个工作中的应用程序。
import zmq
from PyQt4 import QtCore, QtGui

class QZmqSocketNotifier( QtCore.QSocketNotifier ):
    """ Provides Qt event notifier for ZMQ socket events """
    def __init__( self, zmq_sock, event_type, parent=None ):
        """
        Parameters:
        ----------
        zmq_sock : zmq.Socket
            The ZMQ socket to listen on. Must already be connected or bound to a socket address.
        event_type : QtSocketNotifier.Type
            Event type to listen for, as described in documentation for QtSocketNotifier.
        """
        super( QZmqSocketNotifier, self ).__init__( zmq_sock.getsockopt(zmq.FD), event_type, parent )

class Server(QtGui.QFrame):

def __init__(self, topics, port, mainwindow, parent=None):
    super(Server, self).__init__(parent)

    self._PORT = port

    # Create notifier to handle ZMQ socket events coming from client
    self._zmq_context = zmq.Context()
    self._zmq_sock = self._zmq_context.socket( zmq.SUB )
    self._zmq_sock.bind( "tcp://*:" + self._PORT )
    for topic in topics:
        self._zmq_sock.setsockopt( zmq.SUBSCRIBE, topic )
    self._zmq_notifier = QZmqSocketNotifier( self._zmq_sock, QtCore.QSocketNotifier.Read )

    # connect signals and slots
    self._zmq_notifier.activated.connect( self._onZmqMsgRecv )
    mainwindow.quit.connect( self._onQuit )

@QtCore.pyqtSlot()
def _onZmqMsgRecv():
    self._test_info_notifier.setEnabled(False)
    # Verify that there's data in the stream
    sock_status = self._zmq_sock.getsockopt( zmq.EVENTS )
    if sock_status == zmq.POLLIN:
        msg = self._zmq_sock.recv_multipart()
        topic = msg[0]
        callback = self._topic_map[ topic ]
        callback( msg )
    self._zmq_notifier.setEnabled(True)
    self._zmq_sock.getsockopt(zmq.EVENTS)

def _onQuit(self):
    self._zmq_notifier.activated.disconnect( self._onZmqMsgRecv )
    self._zmq_notifier.setEnabled(False)
    del self._zmq_notifier
    self._zmq_context.destroy(0)

_on_ZmqMsgRecv中禁用然后重新启用通知器是根据QSocketNotifier的文档进行的。

出于某种原因,对getsockopt的最终调用是必要的。否则,通知器在第一个事件后停止工作。我实际上要为此发布一个新问题。有人知道这为什么需要吗?

请注意,如果您在ZMQ上下文之前不销毁通知器,则在退出应用程序时可能会收到以下错误:

QSocketNotifier: Invalid socket 16 and type 'Read', disabling...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接