Spring DefaultMessageListenerContainer 和 ActiveMQ

6
我已经将Spring DefaultMessageListenerContainer配置为ActiveMQ消费者,从Test.Queue队列中获取消息。我有4台不同的机器上部署了这段代码,并且所有机器都配置到同一个ActiveMQ实例中,以处理来自同一个Test.Queue队列的消息。
当我将最大消费者数设置为20时,在所有4台机器启动并运行时,我看到针对该队列的消费者数量为80(4 * max consumer size = 80)。
当生产并发送到队列中的消息增多时,一切都很好。
当有数千条消息并且在80个消费者中,假设其中一个消费者被卡住,则会使Active MQ冻结,停止向其他消费者发送消息。
所有消息都永远被卡在ActiveMQ中。
由于我有4台机器,每台机器最多有80个消费者,因此我不知道要查看哪个消费者未能确认消费。
我关闭并重新启动了所有4台机器,当我停止具有错误消费者的机器时,消息开始再次流动。
我不知道如何配置DefaultMessageListenerContainer来放弃错误的消费者并立即通知ActiveMQ开始发送消息。
即使没有Spring,我也能创建以下场景:
1.我生成了多达5000条消息并将它们发送到“Test.Queue”队列。
2.我创建了两个消费者(Consumer A,B),并在一个消费者B的onMessage()方法中,将线程休眠很长时间(Thread.sleep(Long.MAX_VALUE)),其条件是当前时间%13为0时,则将线程放入睡眠状态。
3.运行这两个消费者。
4.进入Active MQ并发现队列有2个消费者。
5. A和B都在处理消息
6.在某个时刻,调用消费者B的onMessage()并在当前时间%13为0的条件下将线程置于休眠状态。
7.消费者B被卡住了,无法向代理确认
8.回到Active MQ Web控制台,仍然看到消费者为2,但没有消息被出队。
9.现在我创建了另一个消费者C并将其运行以消耗。
10.ActiveMQ中只有消费者计数从2增加到3。
11.但是消费者C没有消费任何内容,因为代理未能发送任何消息,并将它们全部保留,因为它仍在等待消费者B进行确认。
12.我还注意到消费者A没有消费任何内容
13.我去杀掉消费者B,现在所有消息都被排干。
假设A,B,C由Spring的DefaultMessageListenerContainer管理,我该如何调整Spring DefaultMessageListenerContainer,在消费者B在X秒内未确认时将其从连接池中删除,并立即确认代理,以免代理永远保留消息。
感谢您的时间。
如果我能得到解决此问题的解决方案,那就太感激了。

也许你应该更改问题的标题,以表明这是一个ActiveMQ问题。 - gkamal
你是否对队列进行了特定的配置,还是只使用了默认配置?例如,为确保消息顺序。 - gkamal
请看这个链接:https://dev59.com/H2kw5IYBdhLWcg3wQoam - gkamal
@gkamal 你认为这是ActiveMQ的问题吗?当其中一个消费者被卡住并未能确认代理时,它停止向其他消费者发送消息,我们该如何解决这个问题?是否有解决方法? - serverfaces
@gkamal <bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="myConnectionFactory" /> <property name="destination" ref="myDestination" /> <property name="messageListener" ref="messageConsumer" /> <property name="cacheLevelName" value="CACHE_CONNECTION" /> <property name="maxConcurrentConsumers" value="20" /> <property name="receiveTimeout" value="10000" /> <property name="maxMessagesPerTask" value="20" /> <property name="idleTaskExecutionLimit" value="5" /> </bean> - serverfaces
@gkamal 这是所有默认配置。您说的是经纪人端配置还是客户端配置?我无法访问经纪人以进行更新,它全部都是默认设置。 - serverfaces
1个回答

3
以下是一些尝试的选项...
  1. set the queue prefetch to 0 to promote better distribution across consumers and reduce 'stuck' messages on specific consumers. see http://activemq.apache.org/what-is-the-prefetch-limit-for.html

  2. set "?useKeepAlive=false&wireFormat.maxInactivityDuration=20000" on the connection to timeout the slow consumer after a specified inactive time

  3. set the queue policy "slowConsumerStrategy->abortSlowConsumer"...again to timeout a slow consumer

    <policyEntry ...
      ...
      <slowConsumerStrategy>
          <abortSlowConsumerStrategy />
      </slowConsumerStrategy>
      ...
    </policyEntry> 
    

即使将预取设置为1以获得更好的分布,消费者仍有可能被卡住。是否有一种方法可以配置DefaultMessageListenerContainer来清理被卡住的消费者并向代理服务器确认回复?在我的情况下,消费者在接收到消息后必须执行许多操作。只有在消费者需要在1-2小时内处理约400,000条消息时,我才会遇到这个问题。我不希望一个消费者控制其他消费者 - 换句话说 - 我不希望其他消费者无所事事,因为代理服务器停止发送消息。 - serverfaces
使用预取设置为1,它对我有所帮助。但是当我有3个消费者,生产并发送了10,000条消息到代理时,我总是发现1条消息被卡在代理中,其中1个消费者故意处于睡眠状态。使用默认的1000,我注意到有几千条消息被卡住了。但这仍然没有解决问题,因为仍然有1条消息被卡住了,可能是一条重要的消息。 - serverfaces
使用abortSlowConsumerStrategy的副作用是什么?假设在代理中有100k条消息,有50个消费者正在处理它们,其中一个消费者速度较慢,那么它会影响已经接收到的消息吗? - serverfaces
另一个想法是使用prefetch=0...这不应该将消息推送给消费者,而是应该拉取它们...至于您的另一个问题,我相信当它们被中止时,它们将被重新传递给其他消费者(因为它们尚未得到ACK确认)。 - Ben ODay

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接