我有以下代码来声明一个队列:
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(getQueueName(), false, false, false, null);
consumer = new QueueingConsumer(channel);
channel.basicConsume(getQueueName(), true,consumer);
接下来获取下一个Delivery对象并处理:
Delivery delivery = null;
T queue = null;
//loop over, continuously retrieving messages
while(true) {
try {
delivery = consumer.nextDelivery();
queue = deserialise(delivery.getBody());
process(queue);
} catch (ShutdownSignalException e) {
logger.warn("Shutodwon signal received.");
break;
} catch (ConsumerCancelledException e) {
logger.warn("Consumer cancelled exception: {}",e.getMessage());
break;
} catch (InterruptedException e) {
logger.warn("Interuption exception: {}", e);
break;
}
}
反序列化代码。正如您所见,我正在使用Kryo:
public T deserialise(byte[] body) {
Kryo kryo= new Kryo();
Input input = new Input(body);
T deserialised = kryo.readObject(input, getQueueClass());
input.close();
return deserialised;
}
如果我使用包含大量对象的队列运行此代码,则在大约270万个对象后会出现内存不足异常。最初我是通过晚上运行它并从JMeter以每秒90个的速率输入数据来发现这一点的,首先它能够正常消耗,但第二天早上我发现RabbitMQ中有很多未处理的消息,而消费者方面则出现了内存不足异常。我再次运行并使用Eclipse Memory Analyzer来确定内存使用情况。从分析结果可以看到,由com.rabbitmq.client.QueueingConsumer引用的java.util.concurrent.LinkedBlockingQueue正在不断增长,直到耗尽内存。
是否需要采取任何措施告诉RabbitMQ释放资源呢?
我可以增加堆大小,但我担心这只是一个短期解决方案,并且我的代码可能存在内存泄漏问题,在生产部署几个月后可能会被发现。