RabbitMQ:使用主题交换的持久化消息

74

我对RabbitMQ非常陌生。

我已经设置了一个'topic'交换。消费者可以在发布者之后启动。我希望消费者能够接收到在它们启动之前发送但尚未被消费的消息。

该交换是使用以下参数设置的:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

这些消息是使用此参数发布的:

delivery_mode => 2

消费者使用 get() 方法从交换机中检索消息。

不幸的是,在任何客户端启动之前发布的消息都将丢失。我尝试了不同的组合。

我猜我的问题是交换机不持有消息。也许我需要在生产者和消费者之间加一个队列。但这似乎在“主题”交换机中不起作用,因为消息是通过键路由的。

我应该如何继续操作?我使用的是 Perl 绑定的 Net::RabbitMQ(不重要),以及 RabbitMQ 2.2.0

3个回答

83
如果在发布消息时没有连接的消费者可用来处理,则需要使用持久队列来存储消息。
交换机不会存储消息,但队列可以。令人困惑的是,交换机可以被标记为“持久”,但实际上这只是意味着如果重新启动代理,则该“交换机本身”仍将存在,但并不意味着发送到该交换机的任何消息都会自动持久化。
考虑到这一点,有两个选择:
1.在启动发布者之前执行一个管理步骤,自己创建队列。您可以使用Web UI或命令行工具来执行此操作。确保将其创建为持久队列,以便它能够存储路由到它的任何消息,即使没有活动的消费者。
2.假设您的消费者被编码为始终在启动时声明(因此自动创建)其交换机和队列(并将它们声明为持久性),则在启动任何发布者之前至少运行所有消费者一次。这将确保所有队列都正确创建。然后,您可以关闭消费者,直到它们真正需要,因为队列将持久地存储路由到它们的任何未来消息。
我会选择#1。要执行的步骤可能不多,并且您始终可以编写所需的脚本,以便可以重复执行。而且,如果您的所有消费者都要从同一个单一队列中提取(而不是拥有专用队列),则这实际上是最小的行政开销。
队列是需要妥善管理和控制的内容。否则,您可能会出现流氓消费者声明持久队列,使用它们几分钟但再也没有使用的情况。不久之后,您将拥有一个永久增长的队列,而没有任何东西来减小其大小,并且代理即将崩溃。

好的,解决方案是在发布者脚本中声明固定的客户端队列。当然,这需要我提前知道会有多少消费者。 - Julien
4
没错,假设每个消费者都需要自己的队列是正确的。但你需要回答的主要问题是,“这些消费者是否需要所有在他们产生之前发送的历史消息?”如果他们不关心旧消息,他们可以在启动时声明自己的队列,并从那时起接收所有消息,但不包括早期的消息。 - Brian Kelly
4
应用程序“声明”队列,然后MQ代理程序会在队列不存在时创建它们。尽管让监听应用程序声明队列而不是发送应用程序似乎是有道理的,但你可能会遇到你所见过的问题。在运行应用程序之前,最好先声明队列、声明交换机、创建虚拟主机等,这可能是最佳解决方案。 - Michael Dillon

20

正如Brian所述,交换机不会存储消息,主要负责将消息路由到其他交换机/队列。如果交换机未绑定到队列,则发送到该交换机的所有消息都将“丢失”。

您不应该需要在发布者脚本中声明固定的客户端队列,因为这可能不可扩展。队列可以由您的发布者动态创建,并使用交换到交换绑定在内部路由。

RabbitMQ支持交换到交换绑定,可以实现拓扑灵活性、解耦和其他优点。您可以在此处阅读更多信息:RabbitMQ Exchange to Exchange Bindings [AMPQ]

RabbitMQ Exchange To Exchange Binding

Example Topology

示例Python代码,使用队列创建具有持久性的交换到交换绑定,如果没有消费者存在。

#!/usr/bin/env python
import pika
import sys
 
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
 
 
#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
 
#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
 
#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
 
##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
 
#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')

2
“Eat All Messages”队列丢失了,据我所知,这些消息仍然无法到达“晚到”的订阅者。 - Kurt Pattyn
解释一下?它确实回答了OP的问题并且有效。请在您的评论中更加建设性。 - Skillachie
2
这实际上是可行的@KurtPattyn和@flyer,因为您随时可以为“Eat All Messages”创建一个新的消费者,从中“恢复”未处理的消息,并将它们路由到正确的位置。 - Kostanos
1
只是补充一下@Kostanos所说的:恢复消费者不能消耗消息(没有自动确认,一旦您看到所有消息,请关闭与该队列的连接)。这样,您就可以将rabbitmq用作事件存储 - 不确定他们是否打算这样做。 - mbx
这个“有点不对劲”。正如mbx所写,这将rabbitmq配置为一种事件存储,而这不是它应该被使用的方式,我认为。相反,建议您研究一下是否可以使用Kafka来解决您的问题。Brian Kelly的答案已经很完美地解释了这一点。 - Bernd
它不被用作事件存储。正如发布者所要求和说明的那样,客户端可能未连接,因此消息必须在传递之前暂时排队。当时新而光亮的"Kafka"并不是真正准备好了。何不提交一个完整的答案呢?祝您度过愉快的一天。 - Skillachie

0

你的情况似乎是“消息持久性”。

RabbitMQ教程文档中可以看到,您需要将queuemessages都标记为持久化(以下代码为C#版本。对于其他语言,您可以参考这里)。

  1. 首先,在发布者中,您需要确保queueRabbitMQ节点重新启动后仍然存在。为此,我们需要将其声明为持久化:
channel.QueueDeclare(queue: "hello",
                     durable: true,
                     ....);

其次,在 Consumer 中,您需要将您的消息标记为持久性 - 通过将 IBasicProperties.SetPersistent 设置为 true。
var properties = channel.CreateBasicProperties();
properties.Persistent = true;

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接