ZeroMQ的PUB套接字在连接时会缓冲所有我发出的数据

14

我注意到 ZeroMQ 的 PUB 套接字在连接时会缓冲所有的传出数据,例如:

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.connect("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.bind("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print sub.recv()

sub绑定在那些消息之后,它们应该被丢弃,因为如果没有人连接到PUB,它应该丢弃消息。但是,它不会丢弃消息,而是缓冲所有消息。

a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
a message should not be dropped
hi

正如你所看到的,那些“消息不应该被丢弃”的消息会被套接字缓存,一旦连接成功,它们就会被刷新到 SUB 套接字。如果我在 PUB 套接字上绑定,并在 SUB 套接字上连接,则可以正常工作。

import zmq
import time
context = zmq.Context()

# create a PUB socket
pub = context.socket (zmq.PUB)
pub.bind("tcp://127.0.0.1:5566")
# push some message before connected
# they should be dropped
for i in range(5):
    pub.send('a message should not be dropped')

time.sleep(1)

# create a SUB socket
sub = context.socket (zmq.SUB)
sub.connect("tcp://127.0.0.1:5566")
sub.setsockopt(zmq.SUBSCRIBE, "")

time.sleep(1)

# this is the only message we should see in SUB
pub.send('hi')

while True:
    print repr(sub.recv())

你只能看到输出结果

'hi'

这种奇怪的行为会导致一个问题,它会缓存所有连接套接字上的数据。我有两个服务器,服务器A向服务器B发布数据。

Server A -- publish --> Server B

如果服务器B联机,那么它能够良好工作。但是,如果我启动服务器A而不启动服务器B呢?

结果是,连接到服务器A的PUB套接字会保留所有这些数据,内存使用量会变得越来越高。

问题在于,这种行为是bug还是feature?如果是feature,那么我该在哪里找到提到这种行为的文档?我如何停止连接的PUB套接字缓存所有数据?

谢谢。

6个回答

7
无论套接字是阻塞还是丢弃消息,都取决于套接字类型,如ZMQ::Socket文档所述(以下是我自己加的重点):
ZMQ::HWM:获取高水位标记
ZMQ::HWM选项将检索指定套接字的高水位标记。高水位标记是0MQ为与指定套接字通信的任何单个对等方在内存中排队的最大未处理消息数的硬限制。 如果达到此限制,则套接字将进入异常状态,并根据套接字类型,0MQ将采取适当的操作,例如阻塞或丢弃已发送的消息。有关每种套接字类型所采取的确切操作的详细信息,请参阅ZMQ::Socket中的各个套接字描述。
ZMQ::HWM的默认值为零,意味着“没有限制”。
您可以通过查看套接字类型的ZMQ::HWM option action文档来确定它是否会阻止或丢弃,该文档将是BlockDropZMQ::PUB的操作是Drop,因此如果它没有丢弃消息,您应该检查HWM(高水位标记)值,并注意警告 ZMQ::HWM的默认值为零,意味着“没有限制”,这意味着它将不会进入异常状态,直到系统内存用尽(在此时我不知道它的行为)。

1
我知道我可以设置HWM来限制缓冲区中的消息数量。但这并不能解决问题,因为PUB处理HWM状态的方式是丢弃新消息。这意味着如果您设置了HWM,只有前导消息会保留在缓冲区中。我正在编写音频流系统。这种行为使得使用起来非常烦人。比如说,你发送消息[1, 2, 3, 4],然后将HWM设置为2,那么套接字将为你缓冲[1, 2],所有新消息都将被丢弃。但对于音频流,最重要的部分是新到来的数据。是否有任何方法可以调整HWM丢弃消息的方式? - Fang-Pen Lin
啊,你的意思是如果HWM设置为2并且你发送[1, 2, 3, 4],那么它应该删除[1, 2]并保留[3, 4],但是如果你发送5,它应该删除3,最终得到[4, 5]?我不认为ZMQ中存在这种行为。 - aculich
1
这非常有趣。对于某些应用程序来说,删除“旧”消息的能力肯定是必要的(IP电话是一个常见的例子)。 - John Zwinck
当接收队列已满时,您是否找到了删除旧消息而不是新消息的解决方案?(我有一个实时数据源,在这里消息只有在它们是新鲜的时候才有用/有效。我不想排队旧消息。理想情况下,队列应始终只包含一条消息,并在每次获取新消息时删除旧消息)。 - petke

5
我认为这种行为是zmq_connect()的语义。 也就是说:当zmq_connect()返回成功时,连接就在概念上建立了,因此您的连接PUB开始排队消息而不是丢弃。 以下摘自“ZMQ Guide”:
理论上,使用 ØMQ 套接字时,连接和绑定端点无关紧要。然而,在 PUB-SUB 套接字中,如果绑定 SUB 套接字并连接 PUB 套接字,则 SUB 套接字可能会接收旧消息,即在 SUB 启动之前发送的消息。这是绑定/连接工作方式的产物。如果可以的话,最好绑定 PUB 并连接 SUB。 zmq_connect() 中的以下部分提供了一些提示:

与传统套接字的主要区别

一般来说,传统套接字向连接导向的可靠字节流(SOCK_STREAM)或无连接不可靠数据报(SOCK_DGRAM)提供同步接口。相比之下,ØMQ套接字提供了异步消息队列的抽象,确切的排队语义取决于正在使用的套接字类型。传统套接字传输字节流或离散数据报,ØMQ套接字传输离散消息。

ØMQ套接字是异步的,这意味着物理连接建立和拆除、重新连接和有效传递的时间对用户是透明的,并由ØMQ本身组织。此外,如果对等方无法接收消息,则可以将消息排队。


1

他们在套接字上设置了HWM选项。


0

这是一个可能有帮助的小技巧...

ZMQ::HWM设置为一个固定的数字,比如10。连接成功后,在循环中调用订阅者套接字的recv方法,直到它丢弃所有缓冲的消息,然后再开始您的主要接收循环。


0

bind(绑定)和connect(连接)这两个函数会导致不同的行为。你为什么不直接选择喜欢的那种(看起来是bind()),然后使用它呢?

事实上,这是 ZeroMQ 的一个通用特性,即它会将传出的消息缓冲起来,直到建立连接。


因为我有多个节点想要发布数据到一个众所周知的服务器。当然,我可以在 PUB 端绑定,但是结果是,我需要为每个节点分配 N 个地址,服务器不知道会有多少节点。我认为绑定和连接不应该影响行为,一旦建立连接,绑定和连接之间就没有区别,那么为什么要区分呢?我不明白 :S - Fang-Pen Lin
哦,好的。我认为ZeroMQ的行为是符合预期和设计的,所以您可能需要在发送数据之前查询连接。 - John Zwinck
@JohnZwinck 选择使用 bind() 还是 connect() 不应该基于个人偏好,而应该基于它们的使用方式。他在服务器端(发布者)正确地使用了 bind(),在客户端(订阅者)使用了 connect()。它并不总是缓存传出消息,而是由套接字类型和高水位标记的值决定的,具体可以参考这里的文档说明 - aculich

0

您应该能够使用发布套接字的hwm设置,在套接字中设置高水位线。它可以让您定义保留多少条消息。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接