Spring JMS ActiveMQ - 无法同时获取多个消费者进行处理

4
我刚开始使用JMS/ActiveMQ,我有一个Spring/Hibernate应用程序,可以从ActiveMQ的队列中获取消息并处理这些消息以进行持久化。由于消息需要一段时间来处理和持久化,因此我将我的DefaultMessageListenerContainer配置为具有多个消费者(例如5-10),以便可以同时处理多个消息。我查看了很多ActiveMQ和Spring API文档,我认为我所需要做的就是将maxConcurrentConsumers设置为10+concurrentConsumers设置为5,或者在DefaultMessageListenerContainer上将并发设置为5-10。一旦我这样做了,我可以从ActiveMQ的内置控制台中看到我的队列确实有5个消费者。但是,当我将10或100条消息放入队列时,处理似乎是单线程的,并且我添加了一行日志来打印线程ID,它似乎是同一个线程ID按顺序处理所有请求。从控制台的ActiveMQ队列页面上,我点击浏览活动消费者链接来查看正在发生什么,看起来一个消费者有100个待处理的消息,其他4个没有任何消息。

我做了一些研究,找到了这篇Spring的文章(http://forum.springsource.org/showthread.php?61170-Messages-missed-using-DefaultMessageListenerContainer),并添加了一个预取策略,值为2,认为每个消费者都会签收1000条消息。现在当我发送另一批消息时,一个消费者将有2-3条消息挂起,但其他4个消费者仍然空闲,最终由那个消费者顺序处理所有内容。此时我想可能是我在ActiveMQ代理上进行的错误配置。我在文档中读到默认的分派策略是循环策略,但我看到了一个名为constantPendingMessageLimitStrategy的设置,在我的activemq.xml中设置为1000,并尝试将其设置为非常低的数字(例如2),以为它控制代理一次发送给消费者多少条消息,但仍然没有任何作用。希望有人能指出我做错了什么,我已经发布了我的Spring配置,除了尝试那个设置(constantPendingMessageLimitStrategy)之外,我真的没有触及activemq.xml文件。我正在使用ActiveMQ 5.8。

<bean id="importRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="initialRedeliveryDelay" value="15000" />
    <property name="maximumRedeliveries" value="-1" />
    <property name="useExponentialBackOff" value="true" />
    <property name="backOffMultiplier" value="2" />
</bean>

<bean id="importPrefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy">
    <property name="all" value="2"></property>
</bean>

<bean id="importConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="${import.queue.url}"/>
    <property name="redeliveryPolicy" ref="importRedeliveryPolicy" />
    <property name="prefetchPolicy" ref="importPrefetchPolicy"></property>
</bean>

<bean id="importQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="${import.queue.name}" />
</bean>

<bean id="importListener" class="com.mycompany.ImportQueueListener" >
    <property name="importService" ref="importService"></property>
    <property name="sessionFactory" ref="sessionFactory"/>
</bean>

<bean id="importJmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
   <property name="connectionFactory" ref="importConnectionFactory" />
    <property name="destination" ref="importQueue" />
    <property name="messageListener" ref="importListener" />
    <property name="sessionTransacted" value="true" />
    <property name="maxConcurrentConsumers" value="10"></property>
    <property name="concurrentConsumers" value="5"></property>
</bean>
2个回答

0
你应该尝试用一个连接池替换你的ActiveMQConnectionFactory
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy method="stop">
<property name="connectionFactory">
  <bean class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
      <value>tcp://localhost:61616</value>
      </property>
   </bean>
  </property>
</bean>
<bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
   <property name="connectionFactory">
    <ref local="jmsFactory"/>
  </property>
</bean>

0
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd">




 <!--  <bean class="org.apache.activemq.command.ActiveMQQueue" id="destination">  
     <constructor-arg value="TEST.Q1"></constructor-arg>  
  </bean>-->

  <bean id="destination" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg value="TOPIC_NAME" />
  </bean>



 <bean class="org.springframework.jms.core.JmsTemplate" id="producerTemplate">
  <property name="connectionFactory" ref="connectionFactory"/>
   <property name="defaultDestination" ref="destination"/>
 </bean>  

 <!--ActiveMq broker URL configured here-->
   <bean class="org.apache.activemq.ActiveMQConnectionFactory" id="connectionFactory" >
        <property name="brokerURL">
          <value>tcp://localhost:61616</value>
        </property>
   </bean>  

    <!--producer configured here-->

   <bean class="Producer" id="simpleMessageProducer">  
       <property name="jmsTemplate" ref="producerTemplate"></property>  
   </bean> 


    <!--listeners configured here-->

     <bean class="Consumer" id="simpleMessageListener">  

      </bean>  
     <bean class="ConsumerSecond" id="simpleMessageListenerSecond">   </bean> 



     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer"> 
     <property name="connectionFactory" ref="connectionFactory"></property>  
     <property name="destination" ref="destination"></property>  
     <property name="messageListener" ref="simpleMessageListener"></property>  


    </bean> 
     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" id="jmsContainer1"> 
     <property name="connectionFactory" ref="connectionFactory"></property>  
     <property name="destination" ref="destination"></property>
     <property name="messageListener" ref="simpleMessageListenerSecond"></property>

    </bean>  





</beans>

与上述代码一起使用以配置多个监听器。 - Hari
欢迎来到[so]!那些类型的内容应该放在答案本身中,而不是评论中 - 它们只是用于澄清帖子。谢谢! - Qantas 94 Heavy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接