AMQP/RabbitMQ - 顺序处理消息

6

我有一个 直接的 交换器。还有一个绑定到该交换器的队列。

我有两个消费者针对该队列。这些消费者手动在完成相应处理后确认消息。

这些消息是逻辑排序/分类的,应按该顺序处理。是否可以强制要求所有消息在A和B消费者之间顺序地接收和处理?换句话说,防止A和B同时处理消息。

注意:消费者共享同一连接或通道。这意味着我不能使用<channel>.basicQoS(1);

这个问题的原因是:两个消费者完全相同。如果一个出现问题,另一个队列会开始处理消息,并且一切都会正常工作,无需任何干预。


2
单活动消费者怎么样 https://www.rabbitmq.com/consumers.html#single-active-consumer?多个消费者绑定,但所有消息只发送到第一个消费者。如果它死了,那么消息将被分派到第二个消费者。您可以获得冗余性并且处理顺序得到保留。 - Kirill G.
单个活动消费者看起来比独占绑定更干净,但需要 RabbitMQ 3.8。 - lreeder
3个回答

6
在需要处理特定顺序消息但需要冗余消费者的情况下,处理故障转移的一种方法是在设置到队列的绑定时使用“exclusive consumer”选项,并拥有两个消费者,它们即使无法获得独占锁定也会继续尝试绑定。
过程如下:
1. 消费者A首先启动并作为独占消费者绑定到队列。消费者A开始从队列处理消息。 2. 然后,消费者B开始并尝试将自己绑定到队列作为独占消费者,但因队列已经有一个独占消费者而被拒绝。 3. 每隔一段时间,消费者B尝试获取队列上的独占绑定,但被拒绝。 4. 托管消费者A的进程崩溃。 5. 消费者B尝试再次将自己绑定到队列作为独占消费者,并成功了。消费者B开始从队列处理消息。 6. 消费者A恢复在线状态,尝试进行独占绑定,但现在被拒绝。 7. 消费者B继续按先入先出的顺序处理消息。
虽然这种方法不能提供负载共享,但它确实提供了冗余。

6
尽管此问题已有答案,但这可能对其他人有所帮助。RabbitMQ具有称为Single Active Consumer的特性,与您的情况相匹配。
我们可以将N个消费者附加到一个队列中,但只有其中1个将从该队列中主动消费消息。仅在活动消费者失败时才会进行故障转移。
请查看链接https://www.rabbitmq.com/consumers.html#single-active-consumer 谢谢

3
通常,MQ系统的目的是分配工作负载。当然,有些情况下,处理第N条消息的结果取决于处理第N-1条消息,甚至取决于第N-1条消息本身。
如果A和B不能同时处理消息,那么为什么不只使用A或只使用B呢?在我看来,使用两个消费者并没有任何节省...
在您的情况下,最好只使用一个消费者,但实际上在处理部分进行并行处理(实际上这不是一个词)。
值得补充的是,RMQ将消息均匀地分发给所有消费者(以轮询方式),而不考虑任何标准。当然,这是在预取数设置为1时,默认情况下就是这样。更多信息请参见此处,查找“公平分派”。

1
感谢您的见解。回到您的问题“为什么不只有A或只有B”:您理解得很正确,AB不应同时处理消息。因此确实是A或B。然而,我认为让AB都运行起来会很有用:如果A(或B)崩溃,系统可以在没有任何(手动)干预的情况下继续运行。我从您那里了解到我的方法并不是真正可行的。但问题是:如何实现从A(或B)到B(或A)的正确故障转移? - Kanarie Piet
1
不用客气。更简单的方法是为 A 设置一个监控代理(看门狗),如果它崩溃了,就重新启动它,而不是通过任何方式从 A 切换到 B。消息不会丢失,它们将留在队列中,并在消费者再次上线时被传递。 - cantSleepNow
好的,明白了。RabbitMQ提供高级故障转移功能吗?我知道有管理插件和REST API,但那只是监控(据我所知)。您会建议从哪些插件/库/工具(RabbitMQ或第三方)开始自动故障转移吗? - Kanarie Piet
我不知道故障转移功能,这是客户端,所以我认为服务器不应该负责此事。可能有一些插件,但我从未需要过这样的东西,因此从未查找过。RabbitMQ确实有一个心跳(实际上是AMQP功能)检查 - 也许这对您有所帮助。但是,在任何操作系统上编写看门狗应该非常容易 - 一个例子:每n秒拉取进程列表,查看是否存在消费者,如果不存在,则启动它。 - cantSleepNow
好的,谢谢。我会先研究一下心跳功能。我会把这个问题保持开放几天,以防其他人有更具体的答案。再次感谢。 - Kanarie Piet

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接