可能存在RabbitMQ QueueingConsumer内存泄漏问题

9

我有以下代码来声明一个队列:

Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);

接下来获取下一个Delivery对象并处理:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());

            process(queue);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        }
    }

反序列化代码。正如您所见,我正在使用Kryo:
public T deserialise(byte[] body) {
    Kryo kryo= new Kryo();
    Input input = new Input(body);
    T deserialised = kryo.readObject(input, getQueueClass());
    input.close();

    return deserialised;
}

如果我使用包含大量对象的队列运行此代码,则在大约270万个对象后会出现内存不足异常。最初我是通过晚上运行它并从JMeter以每秒90个的速率输入数据来发现这一点的,首先它能够正常消耗,但第二天早上我发现RabbitMQ中有很多未处理的消息,而消费者方面则出现了内存不足异常。我再次运行并使用Eclipse Memory Analyzer来确定内存使用情况。从分析结果可以看到,由com.rabbitmq.client.QueueingConsumer引用的java.util.concurrent.LinkedBlockingQueue正在不断增长,直到耗尽内存。
是否需要采取任何措施告诉RabbitMQ释放资源呢?
我可以增加堆大小,但我担心这只是一个短期解决方案,并且我的代码可能存在内存泄漏问题,在生产部署几个月后可能会被发现。
4个回答

6
我的错误在于我将频道设置为自动确认(auto ack)。这意味着来自Rabbit的每个消息都会被确认(被认为已收到)。我通过声明频道不自动确认来修复了这个问题,并进行了测试:channel.basicConsume(getQueueName(), false,consumer); 在处理队列后,我会确认该消息:consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);
现在我的队列声明如下:
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), false,consumer);

并且以下是处理队列的步骤:

    Delivery delivery = null;
    T queue = null;

    //loop over, continuously retrieving messages
    while(true) {

        try {
            delivery = consumer.nextDelivery();
            queue = deserialise(delivery.getBody());
            process(queue);
            consumer.getChannel().basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        } catch (ShutdownSignalException e) {
            logger.warn("Shutodwon signal received.");
            break;
        } catch (ConsumerCancelledException e) {
            logger.warn("Consumer cancelled exception: {}",e.getMessage());
            break;
        } catch (InterruptedException e) {
            logger.warn("Interuption exception: {}", e);
            break;
        } catch (IOException e) {
            logger.error("Could not ack message: {}",e);
            break;
        }
    }

我现在可以在RabbitMQ管理界面看到消息以非常高的速率被传递,但它们没有以那个速度被确认。如果我杀死我的消费者,大约30秒内所有这些未确认的消息都会移回准备队列。我将要做的改进之一是设置basicQos值:channel.basicQos(10); 这样就不会有太多未被确认的消息被传递。这是可取的,因为它意味着我可以在同一个队列上启动另一个消费者并开始处理队列,而不是所有东西都在内存中未被确认且不可用于其他消费者。


1
如果我没记错的话,这个解决方案通过不将所有发送给消费者的消息存储在内存映射中并占用越来越多的内存来解决内存泄漏问题。通过将通道设置为自动确认,一次只保留一个消息在JVM内存中,其余消息存储在Rabbit队列中。除了解决内存泄漏问题外,这还使得可以添加更多的消费者来处理负载,并且它们会在完成处理前一个消息后立即拉取下一个消息。 - Arthur
1
你说“通过将通道设置为自动确认”,是指你将其设置为不自动确认吗? - robthewolf

2
解决方法是设置basicQos - channel.basicQos(2);。我的通道声明现在看起来像这样:
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(getQueueName(), false, false, false, null);
        consumer = new QueueingConsumer(channel);
        channel.basicConsume(getQueueName(), true,consumer);
        channel.basicQos(2);

将basicQos设置为2意味着只在内存中保留2条消息。如需更多信息以及有关使用CoDel算法的有趣讨论,请参见http://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/


根据我刚刚阅读的内容,如果这解决了问题,那么你在原始代码中消费的速度不够快,导致缓冲区中堆积了大量已从队列中读取但尚未处理的消息。将QOS更改为2意味着只会缓冲2条消息,队列将缓冲其余的消息。在这种情况下,你的队列不会变得非常大,而是会增加内存使用量吗? - robthewolf
刚刚测试了一下,发现内存仍在增长,队列没有变大。看起来好像什么都没做。我不介意队列变大,因为我可以添加更多的消费者,而且我将消息放入队列的速率是预期速率的30倍左右。我只是担心如果消费者存在内存泄漏问题,那么我可能会在生产一个月后才发现;可能正好是我度假的时候 :/ - Arthur
我非常确定这是一个GC问题。某些数据的引用被保留,随着每个传入消息,内存使用量不断增加。 - robthewolf

1
问题似乎是您的消费者跟不上您的生产者,导致队列无限增长。您需要限制队列的大小,并在达到限制时减缓您的生产者。我还建议优化您的消费者能够跟上的能力。

它一段时间内保持得很好;至少在我观察的第一个小时里,消息以约120个/秒的速度到达并立即被消耗。当我通宵运行时,早上我有434万条未被Rabbit消耗的消息。因此,我重新启动了我的消费者,并在消耗了大约270万条消息后,以超过5000个/秒的速度进行了消耗,然后由于内存不足而停止。看起来消费者可以很好地跟上,但由于QueueingConsumer中的LinkedBlockingQueue增长太快而导致内存不足。 - Arthur
如果队列正在增长,那么消费者不可能跟得上生产者。消费者可能一开始足够快,但随着时间的推移会变慢。 - Peter Lawrey
啊,现在我在想,当我看到它正在消耗时,实际上它只是将消息放入LinkedBlockingQueue的内存中,这并不意味着它被正确地消耗了。这可能有道理。 - Arthur
将任务添加到队列的唯一自然方法是从该队列中删除或取出它。 - Peter Lawrey
顺便说一下,这只是我测试极端情况的结果,在生产环境中不应该发生。 我测试的数据速率很少会出现,而且肯定不会持续很长时间。 - Arthur
通常情况下,我会同意这应该是个问题,除非消费者随着时间的推移变慢。也就是说,你遇到问题只是时间问题。在重新启动消费者之后,它似乎快得多,这很可疑。 - Peter Lawrey

1
这可能是因为对象在被使用后没有被销毁的问题。请展示反序列化的代码。我怀疑您正在通过队列发送对象,并使用某种对象输入流/字节数组输入流进行反序列化。如果您没有正确关闭流,那么可能会导致内存泄漏。

我已经在我的问题中添加了反序列化代码。反序列化代码使用Kryo。我一直在使用Eclipse Memory Analyzer,99%的内存被LinkedBlockingQueue消耗,而LinkedBlockingQueue一直在增长。这是由QueueingConsumer引用的。 - Arthur

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接