如何知道 ZeroMQ 套接字是否准备就绪?

6

我有一份简单的Python ZeroMQ代码,实现了PUSH/PULL功能。如下所示。

  def zmqtest(self):

     print('zmq')
     Process(target=start_consumer, args=('1', 9999)).start()
     Process(target=start_consumer, args=('2', 9999)).start()

     ctx = zmq.Context()
     socket = ctx.socket(zmq.PUSH)
     socket.bind('tcp://127.0.0.1:9999')
     # sleep(.5) # I have to wait here...

     for i in range(5):
        socket.send_unicode('{}'.format(i))

问题是,在发送消息之前,我必须等待超过0.5秒的时间,否则只有一个消费者进程可以接收消息。如果我等待超过0.5秒,一切看起来都很好。
我猜绑定套接字需要一段时间才能稳定下来,而且这是异步完成的。
我想知道是否有更可靠的方法来知道套接字何时准备就绪。

我认为当推送到多个拉取端点时,这种启动行为是可以预期的。另请参阅ZeroMQ Push/Pull pattern usefulness - jq170727
1个回答

6

确实需要一段时间。

确实是异步完成的。

让我们先破坏术语。

ZeroMQ 是一个很棒的框架。每个分布式系统的客户端,想要使用它(除了只使用 inproc:// 传输类),首先实例化一个异步数据泵引擎.. Context() 实例,按需使用。

每个可扩展正式通信模式 { PUSH | PULL | ... | XSUB | SUB | PAIR }不会 创建一个套接字,
而是
实例化一个访问点,稍后可以 .connect().bind() 到某个对等方(另一个适当类型的访问点,在一些 Context() 实例中,无论是本地的还是不本地的(再次说明,仅限于本地的 inproc:// 基础设施))。

在这个意义上,回答一个问题“何时准备好套接字?”需要一项端到端的调查,"跨越"参与 套接字-类似行为的实现的所有元素的分布式系统。


测试“本地”端点 RTO 状态:

对于这个问题,您的代理可以自我连接到一个接收端点(作为 PULL 原型工作),以便在本地端 Context() 实例达到 RTO-state + .bind() 创建的 O/S L3+ 接口开始分发预期代理的 PUSH 消息时,“嗅探”。


测试“远程”代理的 RTO 状态:

这部分可以有一种间接或显式的测试。一种间接的方法可能使用消息嵌入的索引。它可以包含一个递增的数字(序数),它带有一个有关顺序的弱信息。鉴于 PUSH-side 消息路由策略是 Round-robin,本地代理可以确定,在本地 PULL-access-point 收到指示连续序数序列的所有消息之前,没有其他“远程”PULL-ing 代理处于 RTO-state。一旦“本地”PULL-access-point 在序数流中收到“间隙”,那就意味着(当然,只有在所有的 PUSH.setsockopt() 都被正确设置的情况下),存在另一个 - 非本地 - 处于 RTO-state 的 PULL-ing 代理。


这有用吗?

也许是的,也许不是。重点是更好地理解任何分布式系统必须以某种方式应对的新挑战。

多阶段消息队列、多层实现(本地-PUSH-代理代码、本地Context()-线程(s)、本地操作系统、本地内核、LAN/WAN、远程内核、远程操作系统、远程Context()-线程(s)、远程-PULL-代理代码等)和多代理行为仅仅引入了许多地方,其中操作可能会在某些其他方式中出现延迟/阻塞/死锁/失败。

是的,这是一次野外漫步。

尽管如此,我们可以选择使用更丰富、更明确的信号(除了最初考虑的原始数据传输),并帮助解决多代理世界中上下文特定的、信号RTO感知行为,这可能更好地反映实际情况,并在分布式系统的非单体世界中生存下来,也能应对开始出现的其他问题。

明确的信号是应对的一种方式。


微调ZeroMQ基础架构。忘记使用默认值。永远!

最近的API版本开始添加更多选项,以微调ZeroMQ的行为,以适用于特定的用例。一定要仔细阅读所有可用的细节,设置Context()-实例以调整套接字实例访问点的行为,使其最好地匹配您的分布式系统信号+传输需求:

.setsockopt( ZMQ_LINGER,     0 )         # always, indeed ALWAYS
.setsockopt( ZMQ_SNDBUF,    .. )         # always, additional O/S + kernel rules apply ( read more about proper sizing )
.setsockopt( ZMQ_SNDHWM,    .. )         # always, problem-specific data-engineered sizing
.setsockopt( ZMQ_TOS,       .. )         # always, indeed ALWAYS for critical systems
.setsockopt( ZMQ_IMMEDIATE, .. )         # prevents "loosing" messages pumped into incomplete connections

还有很多其他的技术。如果没有这些技术,设计就会始终被钉在真实世界的交易丛林中。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接