MessageDrivenBean重复消费JMS消息

3
我目前在Jboss中运行一个应用程序。我的应用程序需要从ActiveMQ消费JMS消息(我的ActiveMQ是我的Jboss的一个模块,我遵循了这个过程:https://developer.jboss.org/wiki/IntegrationOfJBossAS7WithActiveMQ)。
正如您在链接中看到的那样,我使用一个MessageDrivenBean来消费我的队列上的消息:
@MessageDriven(activationConfig = {
        @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
        @ActivationConfigProperty(propertyName = "destination", propertyValue = "Mongo-DB")})
public class MongoConsumer implements MessageListener {

@Override
public void onMessage(Message message) {
    TextMessage textMessage = (TextMessage) message;

    try {
        json = textMessage.getText();
        collectionId = JsonUtils.extract(json, "_collectionId");
        uuid = JsonUtils.extract(json, "_uuid");

        queriesMongoDB.save(collectionId, json);
        LOGGER.info("Insert in mongo : {}", uuid);
    } catch (TechnicalException e) {
        LOGGER.error("Something went wrong while calling Mongo : {}", e);
        this.rollbackMdb(json);
    } catch (JMSException e) {
        LOGGER.error("Something went wrong with the Mongo Consumer", e);
    }

}
}

我的会话目前以自动确认模式启动:

connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

当我使用少量信息时,这一切都很好地运作。但是当我对我的应用程序进行基准测试,并发送大量信息时,有时MDB会消耗相同的信息两次,并将其保存在MongoDB中,当然也会重复两次。

问题是,只有在我有大量消息(如20万条)时才会发生,我大约收到了10个副本。

似乎一个线程正在获取信息以处理它,在此期间,另一个线程正在做完全相同的事情。

我还更改了JMS会话的类型,以CLIENT_acknowledge模式,并添加了:

 message.acknowledge();

在我的方法开头,但那没起作用。
附注:抱歉我的英语不好。
编辑:
我刚刚复现了这个bug并且读取了server.log,我得到了一个重复的类型错误。
16:41:35,376 WARN  [org.apache.activemq.TransactionContext] (default-threads - 39) commit of: XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:f
fff0a48263f:-669b4574:57e296f4:23fc96] failed with: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a
48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23f
c95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
        at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1420) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.TransactionContext.syncSendPacketWithInterruptionHandling(TransactionContext.java:761) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.TransactionContext.commit(TransactionContext.java:562) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ra.LocalAndXATransaction.commit(LocalAndXATransaction.java:92)
        at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:682)
        at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2278)
        at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1479)
        at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:98)
        at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:162)
        at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1189)
        at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:126)
        at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:75)
        at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.afterDelivery(MessageEndpointInvocationHandler.java:72) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source) [:1.8.0_66]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_66]
        at java.lang.reflect.Method.invoke(Method.java:497) [rt.jar:1.8.0_66]
        at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.handle(AbstractInvocationHandler.java:60) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:136) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at com.sun.proxy.$Proxy94.afterDelivery(Unknown Source)
        at org.apache.activemq.ra.MessageEndpointProxy$MessageEndpointAlive.afterDelivery(MessageEndpointProxy.java:128)
        at org.apache.activemq.ra.MessageEndpointProxy.afterDelivery(MessageEndpointProxy.java:69)
        at org.apache.activemq.ra.ServerSessionImpl.afterDelivery(ServerSessionImpl.java:225)
        at org.apache.activemq.ActiveMQSession.run(ActiveMQSession.java:1016) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ra.ServerSessionImpl.run(ServerSessionImpl.java:169)
        at org.jboss.jca.core.workmanager.WorkWrapper.run(WorkWrapper.java:215)
        at org.jboss.threads.SimpleDirectExecutor.execute(SimpleDirectExecutor.java:33)
        at org.jboss.threads.QueueExecutor.runTask(QueueExecutor.java:808)
        at org.jboss.threads.QueueExecutor.access$100(QueueExecutor.java:45)
        at org.jboss.threads.QueueExecutor$Worker.run(QueueExecutor.java:849)
        at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66]
        at org.jboss.threads.JBossThread.run(JBossThread.java:122)
Caused by: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
        at org.apache.activemq.transaction.XATransaction.newXAException(XATransaction.java:174)
        at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:368)
        at org.apache.activemq.broker.TransactionBroker.commitTransaction(TransactionBroker.java:252)
        at org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(MutableBrokerFilter.java:117)
        at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:498)
        at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:248)
        at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) [activemq-client-5.10.0.jar:5.10.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_66]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_66]
        at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66]

16:41:35,410 WARN  [com.arjuna.ats.jta] (default-threads - 39) ARJUNA016039: onePhaseCommit on < formatId=131077, gtrid_length=29, bqual_length=36, tx_uid=0:ffff0a48263f:-669b4574:57e296f4:23fc95, node_name=1, branch_uid=0:ffff0a48263f:-669b4574:57e296f4:23fc96, subordinatenodename=null, eis_name=unknown eis name > ([org.apache.activemq.ra.LocalAndXATransaction@7e5f0986,TransactionContext{transactionId=null,connection=ActiveMQConnection {id=ID:tsfla902v-34440-1474467574382-7:1,clientId=ID:tsfla902v-34440-1474467574382-6:1,started=true}}]) failed with exception XAException.XAER_NOTA: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
        at org.apache.activemq.TransactionContext.toXAException(TransactionContext.java:786) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.TransactionContext.commit(TransactionContext.java:595) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ra.LocalAndXATransaction.commit(LocalAndXATransaction.java:92)
        at com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit(XAResourceRecord.java:682)
        at com.arjuna.ats.arjuna.coordinator.BasicAction.onePhaseCommit(BasicAction.java:2278)
        at com.arjuna.ats.arjuna.coordinator.BasicAction.End(BasicAction.java:1479)
        at com.arjuna.ats.arjuna.coordinator.TwoPhaseCoordinator.end(TwoPhaseCoordinator.java:98)
        at com.arjuna.ats.arjuna.AtomicAction.commit(AtomicAction.java:162)
        at com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple.commitAndDisassociate(TransactionImple.java:1189)
        at com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit(BaseTransaction.java:126)
        at com.arjuna.ats.jbossatx.BaseTransactionManagerDelegate.commit(BaseTransactionManagerDelegate.java:75)
        at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.afterDelivery(MessageEndpointInvocationHandler.java:72) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at sun.reflect.GeneratedMethodAccessor67.invoke(Unknown Source) [:1.8.0_66]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) [rt.jar:1.8.0_66]
        at java.lang.reflect.Method.invoke(Method.java:497) [rt.jar:1.8.0_66]
        at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.handle(AbstractInvocationHandler.java:60) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at org.jboss.as.ejb3.inflow.MessageEndpointInvocationHandler.doInvoke(MessageEndpointInvocationHandler.java:136) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at org.jboss.as.ejb3.inflow.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:73) [jboss-as-ejb3.jar:7.5.7.Final-redhat-3]
        at com.sun.proxy.$Proxy94.afterDelivery(Unknown Source)
        at org.apache.activemq.ra.MessageEndpointProxy$MessageEndpointAlive.afterDelivery(MessageEndpointProxy.java:128)
        at org.apache.activemq.ra.MessageEndpointProxy.afterDelivery(MessageEndpointProxy.java:69)
        at org.apache.activemq.ra.ServerSessionImpl.afterDelivery(ServerSessionImpl.java:225)
        at org.apache.activemq.ActiveMQSession.run(ActiveMQSession.java:1016) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.ra.ServerSessionImpl.run(ServerSessionImpl.java:169)
        at org.jboss.jca.core.workmanager.WorkWrapper.run(WorkWrapper.java:215)
        at org.jboss.threads.SimpleDirectExecutor.execute(SimpleDirectExecutor.java:33)
        at org.jboss.threads.QueueExecutor.runTask(QueueExecutor.java:808)
        at org.jboss.threads.QueueExecutor.access$100(QueueExecutor.java:45)
        at org.jboss.threads.QueueExecutor$Worker.run(QueueExecutor.java:849)
        at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66]
        at org.jboss.threads.JBossThread.run(JBossThread.java:122)
Caused by: javax.transaction.xa.XAException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4
        at org.apache.activemq.transaction.XATransaction.newXAException(XATransaction.java:174)
        at org.apache.activemq.broker.TransactionBroker.getTransaction(TransactionBroker.java:368)
        at org.apache.activemq.broker.TransactionBroker.commitTransaction(TransactionBroker.java:252)
        at org.apache.activemq.broker.MutableBrokerFilter.commitTransaction(MutableBrokerFilter.java:117)
        at org.apache.activemq.broker.TransportConnection.processCommitTransactionOnePhase(TransportConnection.java:498)
        at org.apache.activemq.command.TransactionInfo.visit(TransactionInfo.java:100) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:334)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
        at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:248)
        at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133) [activemq-client-5.10.0.jar:5.10.0]
        at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48) [activemq-client-5.10.0.jar:5.10.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_66]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_66]
        at java.lang.Thread.run(Thread.java:745) [rt.jar:1.8.0_66]

16:41:35,418 WARN  [com.arjuna.ats.arjuna] (default-threads - 39) ARJUNA012084: One-phase commit of action 0:ffff0a48263f:-669b4574:57e296f4:23fc95 received heuristic decision: TwoPhaseOutcome.HEURISTIC_HAZARD

编辑2:

看起来有人在这里遇到了同样的问题:https://issues.apache.org/jira/browse/AMQ-5953

我的ActiveMQ模块版本是5.11,我将尝试安装5.13版本,并查看它是否可行。 我会随时更新大家。


你怎么知道这些消息是重复的?你确定它们具有相同的JMSMessageID而不是生产者错误地发布了它们吗? - Matt Pavlovich
这是我的第一个猜想,但我记录了每个 MDB 所消耗的消息,并且在发生时完全相同(相同的 JMSMessageId、相同的内容等)。我在 ActiveMQ 规范中读到,你不能同时在队列中拥有相同的 JMSMessageId。因此,我确定是我的消费者的故障。 - PurplePanda
.rollbackMdb是什么?也许它需要在两个catch块{}中都存在。您可以考虑使用SESSION_TRANSACTED,这样您就可以在try {}中确认并在所有catch {}中回滚。 - Matt Pavlovich
1
编辑:看到你使用了CLIENT_ACK..不要使用它..它很笨重..它会确认当前消息和会话中所有先前的消息。请使用Transacted或ActiveMQSession.INDIVIDUAL_MESSAGE...后者提供每个消息的确认。 - Matt Pavlovich
感谢您的回复。我尝试了INDIVIDUAL_MESSAGE,但效果并不好。我认为问题出现在XA上,但我并不完全理解发生了什么。您可以在我的帖子编辑中找到堆栈跟踪。再次感谢您的帮助。 - PurplePanda
1个回答

3

我终于找到了解决方案!正如我在原帖的编辑中所解释的那样,问题似乎与我的代码处理事件时销毁的XATransaction有关。事务在队列中重新创建,而我的代码在其后进行处理。

在我的server.log中,我看到了以下堆栈信息:

16:41:35,376 WARN  [org.apache.activemq.TransactionContext] (default-threads - 39) commit of: XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:f
fff0a48263f:-669b4574:57e296f4:23fc96] failed with: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23fc95,branchId=0:ffff0a
48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4: javax.jms.JMSException: Transaction 'XID:[131077,globalId=0:ffff0a48263f:-669b4574:57e296f4:23f
c95,branchId=0:ffff0a48263f:-669b4574:57e296f4:23fc96]' has not been started. xaErrorCode:-4

然后我找到了这个链接:https://issues.apache.org/jira/browse/AMQ-5953。我的JBoss内的ActiveMQ模块版本是5.11,我成功安装了5.14。从此以后,我不再遇到任何重复问题了。


谢谢PurplePanda!我遇到了同样的问题,你的提示真的解决了它。 - Vincent Vieira

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接