我有一个应用程序,它使用RabbitMQ作为消息队列,在两个组件:发送器和接收器之间发送/接收消息。 发送者以非常快的方式发送消息。 接收者接收消息,然后执行一些非常耗时的任务(主要是对非常大的数据大小进行数据库写入)。 由于接收者需要很长时间才能完成任务,然后从队列中检索下一条消息,因此发送者将快速填充队列。 那么我的问题是:这会导致消息队列溢出吗?
消息消费者如下所示:
消息消费者如下所示:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
每个由消费者接收的消息都包含一个案例ID。对于每个案例ID,它将保存大量数据到数据库中,这需要很长时间。目前,由于生产者/消费者使用相同的队列进行案例ID的发布/订阅,因此仅设置了一个消费者用于RabbitMQ。那么我该如何加快消费者吞吐量,使消费者能够赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消耗速率吗?或者我应该使用多个消费者同时消费传入的消息?还是有任何异步方式可以让消费者异步地消费消息而不必等待其完成?欢迎任何建议。