将ZMQStream与已存在的tornado ioloop关联

4
我有一个应用程序,在每个websocket连接(在tornado的open回调中)创建一个zmq.SUB套接字到现有的zmq.FORWARDER设备。这个想法是通过回调从zmq接收数据,然后将其通过websocket连接传递给前端客户端。 https://gist.github.com/abhinavsingh/6378134 ws.py
import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()

from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()

class ZMQPubSub(object):

    def __init__(self, callback):
        self.callback = callback

    def connect(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.SUB)
        self.socket.connect('tcp://127.0.0.1:5560')
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.callback)

    def subscribe(self, channel_id):
        self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)

class MyWebSocket(WebSocketHandler):

    def open(self):
        self.pubsub = ZMQPubSub(self.on_data)
        self.pubsub.connect()
        self.pubsub.subscribe("session_id")
        print 'ws opened'

    def on_message(self, message):
        print message

    def on_close(self):
        print 'ws closed'

    def on_data(self, data):
        print data

def main():
    application = Application([(r'/channel', MyWebSocket)])
    application.listen(10001)
    print 'starting ws on port 10001'
    ioloop.start()

if __name__ == '__main__':
    main()

forwarder.py

import zmq

def main():
    try:
        context = zmq.Context(1)

        frontend = context.socket(zmq.SUB)
        frontend.bind('tcp://*:5559')
        frontend.setsockopt(zmq.SUBSCRIBE, '')

        backend = context.socket(zmq.PUB)
        backend.bind('tcp://*:5560')

        print 'starting zmq forwarder'
        zmq.device(zmq.FORWARDER, frontend, backend)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        logger.exception(e)
    finally:
        frontend.close()
        backend.close()
        context.term()

if __name__ == '__main__':
    main()

publish.py

import zmq

if __name__ == '__main__':
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect('tcp://127.0.0.1:5559')
    socket.send('session_id helloworld')
    print 'sent data for channel session_id'

然而,我的 ZMQPubSub 类似乎根本没有接收到任何数据。
我进一步尝试并意识到,在 ZMQPubSub 注册 on_recv 回调之后,需要调用 ioloop.IOLoop.instance().start()。但是,这将阻塞执行。
我还尝试将 main.ioloop 实例传递给 ZMQStream 构造函数,但也没有帮助。
是否有一种方法可以将 ZMQStream 绑定到现有的 main.ioloop 实例,而不会阻止在 MyWebSocket.open 中的流程?

tornado==3.1 pyzmq==13.1.0 请将这两个版本的库安装到您的项目中。 - Abhinav Singh
@minrk,我已经更新了示例代码以反映我正在尝试的内容。您能否在您的端上运行此示例代码并在“on_data”回调中接收数据? - Abhinav Singh
如果您从PUB切换到XPUB,可以在发送之前等待订阅者。根据您的连接模式,推/拉也可能更适合您的用例。您能否提供有关每个服务生命周期的更多详细信息? - minrk
@minrk 谢谢你的回复Min。应用程序的目标是实现一种语言无关的方式,从基础设施内的任何位置(例如工作线程或甚至shell)将数据推送到Web客户端。因此,理想情况下,Web客户端将订阅若干默认频道,发布者将只需连接/发送/退出大多数情况。我对zmq模式还比较陌生,仍在挖掘zmq文档,以找出为什么这里需要1毫秒的休眠(对我来说并不明显)。在这里睡眠肯定是不合适的,或者我可能应该使用不同的zmq模式。 - Abhinav Singh
慢加入者综合症在指南的第一章中有所涉及。考虑到您想要一种类似于“发送并忘记”的模型,PUSH可能更适合。任何给定消息应该发送给多少个客户端?如果确实是发布-订阅模式,那么每条消息应该到达几个端点,这是正确的吗? - minrk
显示剩余3条评论
2个回答

5
在您现在的完整示例中,只需将您的转发器中的frontend更改为PULL套接字,并将您的发布者套接字更改为PUSH,它应该按您的预期运行。
与此相关的套接字选择的一般原则如下:
  • 当您想要向每个准备好接收消息(可能没有人)的人发送消息时,请使用PUB/SUB
  • 当您想要向一个对等方发送消息并等待其准备就绪时,请使用PUSH/PULL
最初看起来你可能只需要PUB-SUB,但是一旦你开始查看每个套接字对,你会意识到它们非常不同。 frontend-websocket连接肯定是PUB-SUB - 您可能有零到多个接收者,并且您只想在消息通过时向所有可用的人发送消息。 但是后端是不同的 - 只有一个接收者,并且它绝对希望从发布者那里获得每条消息。
所以这就是答案 - 后端应该是PULL,前端应该是PUB。 所有套接字:
PUSH -> [PULL-PUB] -> SUB

publisher.py: 套接字为 PUSH,连接到设备.py中的 backend

forwarder.py: backendPULLfrontendPUB。 ws.py: SUB 连接并订阅到 forwarder.frontend

在您的情况下导致后端PUB/SUB失败的相关行为是慢加入者综合症,这在指南中有描述。实质上,订阅者需要一定时间来告诉发布者他们的订阅信息,因此,如果您在打开PUB套接字后立即发送消息,则很可能还没有告知它有任何订阅者,因此它只会丢弃消息。


我在上一条回复中的解释可能不够清晰。我看到你建议用流式传输(push/pull)设备来替换转发器(pub/sub),如果我理解有误请纠正。然而,我的使用情况需要基于pub/sub的解决方案。每个websocket连接默认订阅至少3个频道:a) all_channels b) ${session_id}_channels c) ${session_id}_channel_${tab_id}。实际上,c) 可以使用push/pull,但是a)和b)都需要pub/sub策略。明白了吗? - Abhinav Singh
1
不,我并不建议您用PUSH/PULL设备替换它 - 只需将设备中的SUB替换为PULL,将发布者服务替换为PUSH - 这样,整个链路就是:PUSH -> PULL-PUB -> SUB-websocket。您拥有的是PULL-PUB设备,而不是PULL-PUSH或SUB-PUB。 - minrk

1

ZeroMq的订阅者必须订阅他们希望接收的消息;但我在你的代码中没有看到这一点。我认为Python的做法是:

self.socket.setsockopt(zmq.SUBSCRIBE, "")

我忘了在上面的问题中添加那一部分。Websocket连接订阅一组channel_id。我已经更新了上面的代码以反映相同的内容。 - Abhinav Singh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接