让一个简单的Spring JMS客户端确认工作

19
刚开始着手学习在Spring中实现JMS确认功能。目前为止,我的消费者已经完美运行,唯一的问题是当我不确认消息时,它仍然从队列中被取走(我希望它保留在那里或者进入死信队列)。
    <?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

    <!-- A JMS connection factory for ActiveMQ -->
    <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"
    p:brokerURL="failover://(tcp://jms1:61616,tcp://jms2:61616)?randomize=false&amp;jms.redeliveryPolicy.maximumRedeliveries=5" />

    <!-- A POJO that implements the JMS message listener -->
    <bean id="simpleMessageListener" class="com.company.ConsumerClass" />

    <!-- A JMS namespace aware Spring configuration for the message listener container -->
    <jms:listener-container
            container-type="default"
            connection-factory="connectionFactory"
            acknowledge="client"
            concurrency="10-50"
            cache="consumer">
        <jms:listener destination="someQueue" ref="simpleMessageListener" method="onMessage" />
    </jms:listener-container>
</beans>

在ConsumerClass中,我的简单消费者看起来像这样:
@Override public final void onMessage(Message message) {
    Object postedMessage = null;
    try {
        postedMessage = ((ObjectMessage) message).getObject();

        if (postedMessage.getClass() == SomeMessageType.class) {
            try {
                //Some logic here
            
                message.acknowledge();
                return; //Success Here
            } catch (MyException e) {
                logger.error("Could not process message, but as I didn't call acknowledge I expect it to end up in the dead message queue");
            }
        }
    } catch (JMSException e) {
        logger.error("Error occurred pulling Message from Queue", e);
    }

    //Also worth noting, if I throw new RuntimeException("Aww Noos"); here then it won't take it from the queue, but it won't get consumed (or end up as dead letter)...
}
6个回答

22

请阅读此文档:Spring JMS容器不使用message.acknowledge()

监听器容器提供以下消息确认选项:

将"sessionAcknowledgeMode"设置为"AUTO_ACKNOWLEDGE"(默认):在执行监听器之前自动确认消息;如果抛出异常,则不进行重新传递。
将"sessionAcknowledgeMode"设置为"CLIENT_ACKNOWLEDGE":成功执行监听器后自动确认消息;如果抛出异常,则不进行重新传递。
将"sessionAcknowledgeMode"设置为"DUPS_OK_ACKNOWLEDGE":在或者之后懒惰地确认消息执行监听器;如果抛出异常,则可能重新传递。
将"sessionTransacted"设置为"true":在成功执行监听器后进行事务性确认;如果抛出异常,则保证重新传递。


javax.jms.Session.AUTO_ACKNOWLEDGE,这是为那些感到困惑的人准备的。 - Christian Nilsson

10

现今,Spring提供了很好的包装来覆盖普通的JMS消息监听器。

请参阅AbstractMessageListenerContainer的JavaDocs。

"sessionAcknowledgeMode"设置为"CLIENT_ACKNOWLEDGE":在成功执行监听器后自动确认消息;最佳努力重新传递抛出用户异常以及其他监听器执行中断(例如JVM死机)的情况。

因此,当您定义一个 @JmsListener 方法时,确认会在成功完成时自动发送,但您可以抛出异常以重新接收消息。


9
我在这里找到了答案。如果您更改acknowledge="transacted"并确保在OnMessage()例程结束时抛出new RuntimeException("Message could not be consumed. Roll back transaction"),则看起来可以很好地工作。
不过我仍然不知道acknowledge="client"实现了什么。

客户端确认处理在侦听器方法返回时处理确认。无论是优雅地还是通过异常,如果方法调用没有被中断,它将在完成后得到确认。由于它不关心是否抛出异常,因此它将从队列中消耗消息。 - alegria

-1
请使用以下代码,它会正常工作。
<bean id="{containerName}"  class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref={connectionFactoryBean} />
    <property name="destinationName" ref="{queue}" />
    <property name="messageListener" ref="{listner}" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE"/>
</bean>

-2
在您的消费者中调用消息的acknowledge()方法。
考虑以下情况:应用程序接收但未确认消息。应用程序接收后续消息并确认它。前一个消息会发生什么?前一个消息也被视为已确认。通常,确认特定消息会确认会话接收到的所有先前消息。在上面的输出中,只有消息5被明确确认。消息5之前的所有消息都是隐式确认的。消息5之后的消息没有被确认。
有关更多详细信息,请参阅this article 还要查看此文章Java客户端的Sun Java系统消息队列4.3开发人员指南

-3

在我的实践中,客户端确认很少被使用。典型的设置是自动确认,因此如果您的代码从onMessage()正常返回(没有异常),则消息会自动确认。如果您的代码从onMessage()抛出异常,则没有确认,并且消息通常会重新传递预配置的次数,之后通常会被丢弃或放入死信队列。

在您的情况下,从JMS服务器的角度来看,似乎客户端请求了消息但从未确认,因此仍然由客户端“正在处理”。在这种情况下,其他相同队列上的消费者将看不到该消息,因此您可能会认为消息已经“从队列中取走”,而实际上它仍然存在。显然,您也不会在死信队列中看到这样的消息。

我建议您阅读JMS规范,以清楚地了解不同的确认模式。


抱歉,在这种情况下,如果我在 message.acknowledge() 之后返回,那么就算成功了(我已经在代码示例中添加了这个)。 - Scotty OB
此外,如果我取消对 throw RuntimeException 的注释(它必须是运行时以满足 MessageListener 接口),我会收到一个错误:08:39:59,066 WARN org.springframework.jms.listener.DefaultMessageListenerContainer#0-2 listener.DefaultMessageListenerContainer:694 - Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException:Aww Noos我可以看到,在这种情况下,消息将与消费者一起保留,直到我终止它,此时它们将尝试在另一个消费者上进行消费,这使我认为 acknowledge="client" 没有任何作用。 - Scotty OB
2
实际上,如果我正确阅读了javadoc,“auto”将在调用监听器之前进行确认:“sessionAcknowledgeMode”设置为“AUTO_ACKNOWLEDGE”(默认值):在监听器执行之前自动确认消息;如果抛出异常,则不会重新传递。 - dstibbe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接