手动确认RabbitMQ中的消息

6

之前我读取了队列中的所有消息,但现在我需要根据用户选择返回特定数量的消息(计数)。

我尝试相应地更改for循环,但由于自动确认,它仍然读取所有消息。因此,我尝试在配置文件中将其更改为手动确认。

在我的程序中,如何在读取消息后手动确认消息(目前我正在使用AmqpTemplate接收消息,没有通道的引用)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());
            
            valueList.add(value);
            messageCount--;
        }
}

任何帮助都非常感激,提前致谢。

1
AmqpTemplate#receive 会自动确认消息,除非通道是事务性的。如果要控制确认,可以使用 AmqpTemplate#execute 并手动执行接收操作,或者最好的方法是使用 SimpleMessageListenerContainerBlockingQueueConsumer - Nicolas Labrot
@NicolasLabrot,我在AmqpTemplate中没有找到execute方法,你是指其他的东西吗?是的,我确实在SimpleMessageListenerContainer中将setAcknowledgeMode设置为MANUAL。 - lambodar
抱歉,我指的是RabbitTemplate#execute,它是AmqpTemplate的实现。 - Nicolas Labrot
@NicolasLabrot,您能否解释一下这个问题。什么是ChannelCallback,看起来我需要一个我没有的通道引用。 - lambodar
1
请查看RabbitTemplate#receive代码,但我认为这不是正确的方法。 - Nicolas Labrot
@NicolasLabrot 非常感谢您 :) - lambodar
1个回答

14

使用 receive() 方法无法手动确认消息,应该使用具备 MANUAL 确认模式的 SimpleMessageListenerContainerChannelAwareMessageListener 来实现事件驱动消费。或者使用模板的 execute() 方法,它可以访问 Channel,但这样做将使用更底层的 RabbitMQ API,而不是 Message 抽象。

编辑:

要使用 execute() 方法需要了解底层的 RabbitMQ Java API,但例如以下代码即可实现...

    final int messageCount = 3;
    boolean result = template.execute(new ChannelCallback<Boolean>() {

        @Override
        public Boolean doInRabbit(final Channel channel) throws Exception {
            int n = messageCount;
            channel.basicQos(messageCount); // prefetch
            long deliveryTag = 0;
            while (n > 0) {
                GetResponse result = channel.basicGet("si.test.queue", false);
                if (result != null) {
                    System.out.println(new String(result.getBody()));
                    deliveryTag = result.getEnvelope().getDeliveryTag();
                    n--;
                }
                else {
                    Thread.sleep(1000);
                }
            }
            if (deliveryTag > 0) {
                channel.basicAck(deliveryTag, true);
            }
            return true;
        }
    });

GaryRussell,您能否提供使用execute()的任何示例代码?我非常新手,对此并不了解。先行致谢。 - lambodar
1
如果某个答案回答了你的问题,通常会将其标记为已接受(点击复选标记)。这将有助于其他寻找相同答案的用户。 - Gary Russell
@GaryRussell 如果我们使用 receive* 方法,Spring 使用 NONE 模式还是 AUTO 模式?也就是说,它是让 RabbitMQ 将消息视为自动传送,还是 Spring 在接收到消息后进行 ACK? - asgs
请勿在评论中提出新问题,特别是六年前的答案。确认模式仅适用于监听器容器。该模板在返回之前调用basicAck。如果您想拒绝并重新排队消息,则必须在事务中运行receive()方法并抛出异常,以便basicAck被回滚并且消息将重新排队。 - Gary Russell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接