RabbitMQ从2个队列消费

3

我需要编写一个负载较重的系统,但任务比较简单。因此,我决定将这些任务拆分为多个位于不同位置(或云)的工作节点。为了通信,我想使用rabbitmq队列。

在我的系统中,将有两种软件节点:调度程序和工作节点。调度程序将从queue_input中获取用户输入,将其拆分为较小的任务,并将这些较小的任务放入workers_queue中。工作节点读取此队列并“执行任务”。我在这里使用了轮询负载平衡-当一些工作节点崩溃时,所有工作都非常顺利,然后我会失去关于任务完成的信息(不允许执行单个操作两次,每个任务包含50个迭代的工作代码与不同数据)。

我考虑了一些类似technical_queue的东西-另一个用于调度程序-工作节点通信的通道,我想知道如何以良好的方式设计它。我使用了来自rabbitmq页面的教程,因此我的工作线程看起来像:

while(true) {
   message = consume(QUEUE,...);
   handle(message); //do 50 simple tasks in loop for data in message
}

我该如何处理第二个队列?另外一个线程使用while(true) {}循环,还是有更好的解决方案?也许我应该重用现有的主题交换队列吗?(但我想在处理任务时拥有独立的通信方式,这可能需要一些时间。)

2个回答

1
你可能需要看一下 (doc)。我不想让你添加一个层,但是这个Spring库处理了线程问题并使用SimpleMessageListenerContainer管理线程。每个容器都会进入一个队列,你可以为每个队列指定线程数量(即工作线程数)。
或者你可以使用ExecutorService创建自己的容器,但你可能最终会重写SimpleMessageListenerContainer的功能。另外,你也可以通过执行更多进程(通过操作系统或批处理脚本),来为每个队列增加更多的消费者。
就队列拓扑而言,它完全取决于业务逻辑/关注点,通常与性能需求无关。通常情况下,您会有更多的队列是出于业务原因,并且有更多的工作人员是出于性能原因,但如果一个队列被相同类型的消息拥堵,可以考虑为该类型的消息提供自己的队列。您所描述的听起来像是在您的worker队列上有多个消费者的两个队列。
除了线程问题和队列拓扑之外,我不太确定您还在问什么。

0
我建议您创建第二个队列消费者。
consumer1 -> queue_process
consumer2 -> queue_process

两个消费者都应该监听同一个队列。 问候希望能够帮到你。


好的,这就是我正在做的事情...此外,在亚马逊云上有相当多的消费进程在不同的服务器上。我需要处理的问题是能够异步地与主服务器通信-例如,告诉他工作进程正在执行ID为1001的任务,其进度为20/50(40%)。这个进度应该在主服务器要求时发送。所以我可能需要另一个队列-第一个队列已经很忙了,满载了任务。 - user2610972

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接