我有一个作为RabbitMQ生产者的应用程序。我采用了RPC方法,没有问题。生产者发布消息并在回复队列(临时队列)中消耗其响应。起初,我使用QueueingConsumer作为生产者消费者,并设置了nextDelivery(timeout)方法的超时时间。现在QueueingConsumer已经弃用,在RabbitMQ官方网站上,他们已经更改了他们的RPC教程,并使用DefaultConsumer替换了QueueingConsumer。我也用DefaultConsumer替换了QueueingConsumer。但是现在出现了一个问题:如何为DefaultConsumer设置超时时间?因为如果消费者没有发送任何响应,垃圾临时队列会留在代理中。旧和新的生产者消费部分如下。感谢您的帮助。
旧的生产者消费方法:
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery deliver = consumer.nextDelivery(timeout);
if (deliver.getProperties().getCorrelationId().equals(corrId)) {
response = new String(deliver.getBody(), "UTF-8");
break;
}
}
return response;
新的生产者消费者方法:
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
};
channel.basicConsume(replyQueueName, true, consumer);
return response.take();