如何为RabbitMQ的DefaultConsumer设置超时时间?

3
我有一个作为RabbitMQ生产者的应用程序。我采用了RPC方法,没有问题。生产者发布消息并在回复队列(临时队列)中消耗其响应。起初,我使用QueueingConsumer作为生产者消费者,并设置了nextDelivery(timeout)方法的超时时间。现在QueueingConsumer已经弃用,在RabbitMQ官方网站上,他们已经更改了他们的RPC教程,并使用DefaultConsumer替换了QueueingConsumer。我也用DefaultConsumer替换了QueueingConsumer。但是现在出现了一个问题:如何为DefaultConsumer设置超时时间?因为如果消费者没有发送任何响应,垃圾临时队列会留在代理中。旧和新的生产者消费部分如下。感谢您的帮助。

旧的生产者消费方法:

    consumer = new QueueingConsumer(channel);
    channel.basicConsume(replyQueueName, true, consumer);
    channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        while (true) {
            QueueingConsumer.Delivery deliver = consumer.nextDelivery(timeout);
            if (deliver.getProperties().getCorrelationId().equals(corrId)) {
                response = new String(deliver.getBody(), "UTF-8");
                break;
            }
        }

        return response;

新的生产者消费者方法:

      final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

    Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            if (properties.getCorrelationId().equals(corrId)) {
                response.offer(new String(body, "UTF-8"));
            }
        }
    };

    channel.basicConsume(replyQueueName, true, consumer);

    return response.take();

1
还不知道Java的BlockingQueue接口。太棒了... - Gerhard Hagerer
1个回答

2

问题已解决。可以在“响应”对象中设置超时时间。在“新的生产者消费者方法”中,更改如下:

将超时时间设置为响应:response.poll(5000, TimeUnit.MILLISECONDS),而不是使用response.take()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接