RabbitMQ:双向联合代理,如何使所有队列弹出消息?

3
我配置了两个使用联邦插件的代理商。它们都将对方设置为上游。
我的测试如下:
- 在代理商A上发布一条消息 - 在代理商B上消费
结果是:
- 在代理商B上消费成功 - 队列在代理商B上弹出了消息,这是好的 - 队列在代理商A上仍然有消息,这是不好的
问题在于:如果我总是在一个代理商上发布,然后总是在另一个代理商上消费,则发布代理商上的队列会增长,直到它满了并开始丢失消息。
我想要的结果是:当消费者在代理商B上消费消息时,代理商A和B上的队列都弹出它们的消息。
现在我正在尝试使用RabbitMQ联邦插件来配置它,你能告诉我如何做吗?
[1] 这两个经纪人相互指向对方作为上游,并且我按照文档中描述的“简单示例”配置它们,除了有两个经纪人分别指向对方作为上游。发布者的代码看起来像这样,消费者的代码看起来像这样。

我猜你正在使用队列联合,对吗?你想从一个代理服务器消费消息,同时另一个代理服务器也必须保持同步,是吗? - Gabriele Santomaggio
@Gabriele re - "[are] you using queue federation?": 我正在使用联合队列的“入门”部分中的“简单示例”(https://www.rabbitmq.com/federation.html)。我认为这个“简单示例”使用了联合交换。 (此外,我的RabbitMQ版本为3.1.5,不支持联合队列(联合队列仅适用于3.2及以上版本)。 - Trevor Boyd Smith
@Gabriele,您是在说联合队列可能是解决问题的方法吗?(我问这个问题是因为我注意到您更有经验(《RabbitMQ Cookbook》的作者,我已经购买了该书)。)||| 从阅读关于联合队列的十分钟内容后,我发现它们似乎正好可以满足我的需求(即不同代理上的消费者将从联合队列中消费消息,并导致上游队列中的消息被消费)。我的解释听起来对您来说正确吗? - Trevor Boyd Smith
你是否考虑使用集群功能而不是联邦功能? - Gabriele Santomaggio
我使用联邦而不是集群,因为联邦适用于WANPHY或不可靠的链接。 - Trevor Boyd Smith
好的,选择不错。如果可以的话,请告诉我!我并不总是在 Stack Overflow 上。 - Gabriele Santomaggio
2个回答

4

@Trevor Boyd Smith,你可以考虑以下的第二或第三个选项。

选项1:双向联合交换

消息将同时存在于经纪人A和B中,每个经纪人各有一份副本,并且相互独立。换句话说,即使经纪人B已将消息传递给其消费者,消息的另一个副本仍然留在经纪人A中。

优点:您始终会拥有两份消息,每个经纪人各有一份,高度可用。

缺点:您需要连接到每个经纪人的消费者。

选项2:双向联合队列

消息将最终出现在两个经纪人之一中。默认情况下,发布消息的经纪人将具有将消息入队的优先级;但是,如果只有另一个经纪人有消费者,则消息将移至另一个经纪人。

无论消息最终出现在哪个经纪人中,只要连接到任一经纪人的消费者,消息就会被传递一次且仅一次。

优点:消息将被传递一次且仅一次到连接到任一经纪人的消费者。

缺点:消息只有一份副本。如果接收消息的代理出现故障,另一个代理将无法获取消息。但是,如果您可以接受最终一致性,这个选项就可以了。原因是当出现问题的代理恢复运行时,消息将最终可用。

选项3:双向联合交换和队列

在这种情况下,一条消息将分别在两个代理中结束,每个代理一份副本。相同的消息将被传递给连接到任一代理的消费者,两次!一旦消息已经被传递两次,它将在两个代理中消失。(如果有两个消费者,一个连接到每个代理,每个代理将向其消费者传递相同的消息一次。)

enter image description here

优点:消费者可以连接任何一个代理服务器,每个代理服务器中的消息都将被传递和出队。
缺点:同一条消息会被传递两次。解决方法是,在处理消息之前,检查是否已经处理了相同的消息。
注意:这并不意味着哪个选项比其他选项更好。这完全取决于您的用例,并且还有许多其他配置可能会影响行为。

2
我创建了这个环境:
服务器A,服务器B。
通过以下方式创建双向联邦:
上游联邦:Server_B = amqp://servera 上游联邦:Server_A = amqp://serverb 然后在两个服务器上创建了相同的策略:
Pattern : ^fed\.
Apply to: all   
federation-upstream-set:all

在服务器A上创建了一个名为fed.test1的队列,然后在服务器B上创建了一个消费者,如下所示:

 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("localhost");
 factory.setPort(5673);
 Connection connection = factory.newConnection();
 Channel channel = connection.createChannel();


Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Message   '" + message );
                }
            };
channel.basicConsume("fed.test1",  true, consumer);

然后向服务器A发布一条消息 ---> fed.test1

该消息已被消费到服务器B,并且队列消息计数为,对于两个队列(服务器A、服务器B)都是如此。

这符合您的预期。

希望能帮助到您。


1
注意,这不会在联邦中维护单一的逻辑队列。只有当有消费者处理信息时,Server A 上的信息才会发送到 Server B。换句话说,这不会在每个服务器上创建消息的冗余实例。另一种选择——交换机联合——只会将消息复制到下游服务器,从而在每个服务器上留下两个分离但相同的可消耗消息。似乎没有选项可以保留消息的副本并确保单一传递。 - Sean Anderson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接