RabbitMQ / AMQP:多个队列,单个消费者

5
我希望创建一个消费者,处理来自多个可变数量源的消息,这些源动态连接或断开连接。
我需要的是每个消费者优先处理每个源的前N条消息。然后运行多个消费者以提高速度。
我一直在阅读工作队列路由主题等文档,以及其他许多文档,但没有找到如何实现此功能。我也进行了一些测试,但没有成功。
有人能指点我如何做或在哪里阅读相关信息吗?
--编辑--
QueueA-----A3--A2--A1-┐
QueueB-----B3--B2--B1-┼------ 消费者
QueueC-----C3--C2--C1-┘
所期望的效果是每个消费者都能获取每个队列的第一条信息。例如:A1、B1、C1、A2、B2、C2、A3、B3、C3等等。如果创建了新队列(QueueD),消费者将以同样的方式开始接收来自该队列的消息。

你需要提供更多细节,说明你想要实现什么以及你的问题是什么。目前为止,每个基于消息的应用程序都可以执行你所描述的操作。 - theMayer
1个回答

2
我需要的是每个消费者优先处理每个源的前N条消息。然后运行多个消费者以提高速度。我所知道的所有消息队列都只在队列内部提供排序保证(Kafka在队列内部的分区中提供排序保证)。但是,您要求对多个队列进行序列化,这在分布式系统上是不可能的。为什么?因为如果您有多个消费者连接到这些队列,消息将以轮询方式交付给队列的每个已连接消费者。假设prefetch_count=1并且有两个连接的消费者,那么第一批交付的消息如下:A1、B1和C1交付给消费者1(X),A2、B2和C2交付给消费者2(Y)。现在,在分布式系统中,一切都是异步的,事情可能会出错。例如:如果X确认A1,则A3将交付给X。但是如果Y在X之前确认A2,则A3将交付给Y。谁先确认不在您的控制范围内。考虑以下情况:X可能需要等待I/O或CPU绑定任务,而Y可能会幸运地没有必须等待。然后,Y将通过队列中的消息进行推进。或者Y被杀了(一个分区)或网络变慢,那么X将继续消耗队列。我强烈建议您重新考虑您的要求,并在异步上下文中考虑您的预期保证(否则,您不会考虑MoM的,对吧?)。附注:可以通过一些消费者端逻辑实现您所要求的内容(对性能/吞吐量有影响)。一个单一的消费者必须连接到所有队列,在Ack'ing消息之前等待每个队列的消息。一旦接收到来自每个队列的消息,请将它们作为单个消息分组并发布到另一个队列(P)中。现在,许多消费者可以订阅P以处理有序的消息组。我不建议这样做,但嘿,这是您的系统,谁会阻止您呢;)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接