JMS消费者阻塞其他JMSX组

3
我正在尝试找出是否有任何方法可以在消息回滚到队列时影响消费者的消息处理顺序。以下是一些简单的代码,可帮助我复制该问题。我只是按特定顺序将具有不同JMSXGroupIds的消息推送到队列中:
- "A1" (JMSXGroupId: 1) - "B1" (JMSXGroupId: 2) - "A2" (JMSXGroupId: 1) - "C1" (JMSXGroupId: 3) - "B2" (JMSXGroupId: 2)
代码使A1回滚(它最初重试了3次消息),并以延迟的方式返回队列。然而,消费者会等待直到可以再次获取A1(等待延迟的时间后),这意味着B1和C1组被阻塞在A1后面,无法被处理。
理想情况下,我希望当A1放回队列并告知其等待时,消费者会选择B1和C1... 我最终要做的就是防止一个JMSXGroup阻塞其他JMSXGroup。此外,值得注意的是,我需要保留A的消息顺序(A1、A2、A3...),并希望通过将它们保留在队列上来实现,而不是构建某些异常管理解决方案。
onException(Exception.class)
            .log("Exception Caught !! ")
            .redeliveryDelay("1000")
            .maximumRedeliveries(3)
            .handled(false)
            .markRollbackOnly()
            .log("log:output");

    from("amq:queue:mailbox?concurrentConsumers=1")
            .to(logEndpoint)
            .process(exchange -> {
                if(exchange.getIn().getBody(String.class).contains("A")) {
                    throw new Exception("Found A");
                }
            });

我正在使用基于Java的Apache Camel微服务和事务路由。没有什么特别的,但如果需要,我可以提供更多详细配置信息。
提前感谢。

嗨 - 我正在希望Camel和/或ActiveMQ是否可以为仅一个JMSXGroupId提供阻塞消息,而且也“希望通过将它们留在队列中来实现这一点,而不是必须构建某些异常管理解决方案。”想知道您是否已经解决了这个问题?对于我希望做的事情:按用户UID分组,为每个JMSXGroupId拥有一个消费者并不可扩展。感谢任何建议。 - n99
3个回答

2

在队列上严格排序注定会遇到这样的问题,因为队列必须遵循其基本的先进先出(即FIFO)语义。即使在A1失败后立即选择B1,您也必须等待消耗A2直到A1被消耗才能保持顺序,而由于队列必须按照FIFO方式消耗,这将阻止C1和其后面的任何其他消息的消耗。


1
您正在使用Camel的重发机制,它只会重新调用处理器方法(例如失败的地方),而不是整个路由。您可能希望考虑使用JMS事务,并让消息回滚到JMS代理,然后在消息代理上配置重发设置。
如果您有《Camel实战》一书的副本,则建议您阅读事务章节和错误处理章节。

1
你的问题是由于消费者端点上的concurrentConsumers=1引起的。如果只有一个消费者,就不可能并行处理JMS组
在这种限制下,你实际上拥有一个排他消费者,因为只有一个消费者来处理消息,所以JMS组头没有效果JMSXGroupId标头确保具有相同组ID的所有消息都由同一消费者处理。因此,如果你有3个消费者,则可以并行处理示例中的3个组,即使消息A1“阻塞”组A的消费者一段时间。
然而,当你的消费者少于组数时,当然会出现一个消息可以阻塞其他组的情况,因为一个消费者处理多个组。

嗨burki,我应该说明我已经简化了这篇文章的解决方案,但我一直在运行2个消费者和6个组(A,B,C,D,E,F - 所有都有三条消息 - A1,A2,A3等),仍然看到了相同的问题,这符合你最后一句话所说的。 - Craig

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接