RabbitMQ:快速生产者和缓慢消费者

21
我有一个应用程序,它使用RabbitMQ作为消息队列,在两个组件:发送器和接收器之间发送/接收消息。 发送者以非常快的方式发送消息。 接收者接收消息,然后执行一些非常耗时的任务(主要是对非常大的数据大小进行数据库写入)。 由于接收者需要很长时间才能完成任务,然后从队列中检索下一条消息,因此发送者将快速填充队列。 那么我的问题是:这会导致消息队列溢出吗?
消息消费者如下所示:
public void onMessage() throws IOException, InterruptedException {
    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer);

    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        String message = new String(delivery.getBody());
        System.out.println(" [x] Received '" + message + "'");

        JSONObject json = new JSONObject(message);
        String caseID = json.getString("caseID");
        //following takes very long time            
        dao.saveToDB(caseID);
    }
}
每个由消费者接收的消息都包含一个案例ID。对于每个案例ID,它将保存大量数据到数据库中,这需要很长时间。目前,由于生产者/消费者使用相同的队列进行案例ID的发布/订阅,因此仅设置了一个消费者用于RabbitMQ。那么我该如何加快消费者吞吐量,使消费者能够赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消耗速率吗?或者我应该使用多个消费者同时消费传入的消息?还是有任何异步方式可以让消费者异步地消费消息而不必等待其完成?欢迎任何建议。
5个回答

19

"这会导致消息队列溢出吗?"

是的,RabbitMQ将进入“流控制”状态,以防止队列长度增加时过度消耗内存。它还将开始将消息持久化到磁盘中,而不是将它们保存在内存中。

"那么我该如何提高消费者吞吐量,使其赶上生产者并避免消息队列溢出?"

您有两个选择:

  1. 添加更多的消费者。请注意,如果选择此选项,则您的数据库现在将被多个并发进程操作。确保数据库能够承受额外的压力。
  2. 增加消费通道的 QOS 值。这将从队列中拉取更多的消息并将其缓冲到消费者上。这将增加总体处理时间;如果缓冲5条消息,则第5条消息将需要与消息1 ... 5一起完成的处理时间。

"在消费者部分使用多线程来加速消费速率吗?"

除非您有一个经过良好设计的解决方案,否则不要这样做。向应用程序添加并行性将在消费端增加很多开销。您可能会耗尽线程池或限制内存使用。

处理AMQP时,您确实需要考虑每个过程的业务要求,以设计最佳解决方案。您的传入消息有多么紧急?它们是否需要尽快持久化到数据库中,或者对于用户来说,即使没有立即获得数据,这是否重要?

如果数据不需要立即持久化,请修改应用程序,使消费者仅从队列中删除消息并将其保存到缓存集合中(例如在Redis中)。引入第二个进程,然后按顺序读取和处理缓存的消息。这将确保您的队列长度不会增长到足以导致流控制,同时防止您的数据库被写请求轰炸,这通常比读请求更昂贵。您的消费者现在只需从队列中删除消息,由另一个进程稍后处理。


谢谢Paul。这是一个非常好的建议。我的数据不需要立即在数据库中持久化。数据库持久化部分需要很长时间,因为它涉及每个案例的数据解析,然后将大量数据(~10000行)保存在一个数据库插入中。因此,使用Redis是一个好主意,因为它是一个内存缓存。但最终我仍然需要将数据持久化到数据库中。那么,在消息消费者接收消息并保存到Redis后,如何使用Redis完成DB写入任务?如果DB插入非常慢,消费者会超出Redis缓存大小限制吗? - tonga
我将从单个或多个进程中消耗每个消息,并在提交到数据库后从Redis中清除该消息。Redis没有缓存限制 - 您受限于主机机器上的RAM数量。 100万个相对较小的键大约为200Mb。如果您担心内存不足,请查看此链接:http://redis.io/topics/memory-optimization - Paul Mooney
我已经发布了一篇文章,概述了扩展AMQP的方法、相关的奖励和缺点:http://insidethecpu.com/2014/11/11/rabbitmq-qos-vs-competing-consumers/ - Paul Mooney
@PaulMooney 我是一个RabbitMQ新手。将消息存储在Redis中与通过设置qos +手动ack而不是自动ack缓慢消费它们的好处是什么? - Michiel Borkent
这两者并不一定相关;在Redis中进行存储可以提供耐久性,以便于可以重试失败的读取,并且在意外关闭情况下可以实现最小消息丢失。应用自定义QOS和手动ACK可以在设计中获得更高的细粒度度量,同时也可以基于流量量来平衡集群。 - Paul Mooney

3
你有很多方法可以提高性能。
1. 你可以创建一个带有更多生产者的工作队列,这样你就创建了一个简单的负载平衡系统。不要使用交换机-队列,而是只使用队列。阅读这篇文章:RabbitMQ非循环调度
2. 当你收到一条消息时,你可以为插入数据库的数据创建一个池线程,但在这种情况下,你必须管理失败。
但我认为主要问题是数据库而不是RabbitMQ。通过良好的调整、多线程和工作队列,你可以拥有可扩展和快速的解决方案。
让我知道。

2
虽然增加更多的消费者可能会加快速度,但真正的问题是保存到数据库中。
这里已经有很多答案讨论了添加消费者(线程和/或机器)和改变QoS,所以我不会重复说明。相反,您应该认真考虑使用聚合器模式将消息聚合成一组消息,然后一次性批量插入数据库。
您当前的代码每个消息可能会打开一个连接,插入数据,然后关闭该连接(或返回到池中)。更糟糕的是,它甚至可能正在使用事务。
通过使用聚合器模式,您实际上是在刷新之前缓冲数据。
现在编写一个好的聚合器是棘手的。您需要决定如何缓冲(即每个工作人员都有自己的缓冲区还是像Redis那样的中央缓冲区)。我相信Spring Integration有一个聚合器。

1

“那么我该如何加快消费者吞吐量,使消费者能够赶上生产者,并避免队列中的消息溢出?”这是答案:“使用多个消费者同时消费传入的消息”,使用多线程并行运行这些实现共享无状态原则的消费者,http://www.eaipatterns.com/CompetingConsumers.html


根据RabbitMQ的文档,这里有两种方法:工作队列、发布/订阅。我现在正在使用发布/订阅模型。如果有多个消费者,我应该改用工作队列吗? - tonga
对于你的需求,应该使用工作队列。以下是它的实现方式:https://github.com/victorpictor/Hotel/blob/master/Infrastructure/MessageTransport/Receivers/Subscriber.cs#L29 - voutrin
但是如果我想为不同的目的使用多个队列怎么办?现在只有一个用于caseID消息的队列。除了caseID之外可能还有更多的数据。因此,我可能需要使用发布/订阅模型来拥有多个队列。 - tonga
在这种情况下,您仍然可以拥有长时间运行的消息消费的竞争消费者,就像现在一样,并且可以为其他队列使用其他类型的消费者。要实现这一点,您需要更改交换机的类型。 - voutrin

0
作为答案,我建议两者都可以。
您可以利用多个接收器,并将每个接收器设置为在单独的线程中执行任务,从而使接收器接受队列中的下一条消息。
当然,这种方法假设每个操作的结果(如果我理解正确,就是对数据库的写入)不会以任何方式影响来自其他消息响应的后续操作的结果。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接