RabbitMQ:使用一致性哈希交换机扩展队列

15
(Picking up from a Github Issue)
我们使用RabbitMQ的一致性哈希交换,它有助于将路由键从一个交换器分片到多个队列。
我们使用队列向工人们分配任务。理想情况下,我们希望动态缩放这些工作程序,但这会带来问题。
要扩展规模,您需要添加新的队列和绑定。单独来看,这并不是什么大问题——除了分片键可能现在开始流向不同的队列。
例如,Worker A可能一直在处理Thing1,但现在随着我们添加新队列,Worker B可能会接收Thing1的消息。重要的是,Worker A在 Worker B开始接收Thing1消息之前已经完成所有Thing1的处理。
是否有任何方法或插件可以缓解这个问题?

我的第一个问题是为什么不能在同一队列中使用多个工作进程?当您必须确保特定顺序时,必须将一个工作进程限制为一个队列。 - slowjack2k
1
我确实不得不多次阅读它。消息顺序对您很重要。因此,在添加新工作程序之前,您需要检查所有队列是否为空。插件指出,在添加新队列时存在竞争条件。在更罕见的情况下,您可能会得到重复的消息。在这种特殊情况下,我不会使用此插件添加所有内容。我将创建一个输入队列,其中一个调度程序工作程序侦听,并且该过程负责将所有消息分发到正确的消息队列。 - slowjack2k
@slowjack2k 工作进程正在内存中管理状态,因此一个工作进程必须仅管理 Thing1 的状态。 - Andy Smith
在这种情况下,我建议您使用调度工作器而不是哈希交换。调度工作器更加灵活。它可以在使用新队列处理下一个请求之前跟踪哪些消息已经被处理,哪些没有。我认为消息重复的风险对您的应用程序影响太大了。 - slowjack2k
1个回答

0
在这种情况下,我会使用一个工作线程来分发消息,而不是哈希交换。 producer1 ... producern => topic-exchange => queue => dispatcher worker => queue1 ... queuen => worker1 ... workern
这样,调度程序工作线程可以跟踪所有消息。为此,您可以简单地检查队列中剩余多少消息,或者您可以检查工作人员确认消息,或者您可以使用rabbitmqs的RPC功能。

那我可以在一个独立的工作进程中有效地实现分布式哈希交换(加上一些功能)吗? - Andy Smith
在这种特殊情况下,是的。这需要更多的工作,但可以给你想要的细粒度控制。我能想到的另一件事是一种控制器工作者,你可以将其作为rpc实现来“询问”,以确定当前是否可以从实际工作者中处理消息。 - slowjack2k
我没有问过的是,也许你可以在生产者内等待,直到该生产者的最后一条消息被处理。然后你只需要更改你的生产者。 - slowjack2k
1
我可以随意更改任何内容,只是想知道最好的方法,并希望一些rabbitmq插件能解决我的所有问题 :). 是的,我可以添加同步步骤 - 但这最终会减慢系统速度。实际上,我想要的是哈希交换机记住它发送了哪个路由键的位置。 - Andy Smith
1
在我看来,以下至少有2个原因说明这种解决方案并不理想:1)只有一个分发消费者和单个队列会破坏在多个队列之间进行负载分片的目的。2)为了有效地实现此目的,需要在分发程序和工作程序之间进行某种特殊通信。分发程序如何知道哪些工作者和队列正在等待消息?分发程序如何知道消费者是否突然停止?而交换机可以优雅地解决所有这些问题,而使用分发程序时,您将不得不付出相当多的复杂性和开销。 - ArnauOrriols
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接