Apache Camel是否支持ActiveMQ通配符消费者?

5

我需要一种从多个activemq jms队列中消费消息的方法。

根据activemq文档,它支持通配符消费者。

我正在使用camel作为消息总线。是否可以查看下面的命名队列?

aaa.processQueue
bbb.processQueue
ccc.processQueue

通过配置camel路由来查看activemq:*.processQueue端点?

如果有更干净的替代方案,请让我知道。


我不知道通配符是否有效,但如果无效,您可以定义多个路由,每个路由都使用相同的处理器从不同的队列中读取。不过这取决于您事先知道队列的名称和数量。 - matt helliwell
2个回答

6

是的。由于Camel使用OpenWire/JMS客户端,因此应该可行。

你的选择如下:

  1. from("activemq:*.processQueue")
  2. from("activemq:aaa.processQueue,bbb.processQueue,ccc.processQueue")
  3. Multiple routes with a sub route for logic:

    from("activemq:aaa.processQueue").to("direct:doProcess");
    from("activemq:bbb.processQueue").to("direct:doProcess");
    from("activemq:ccc.processQueue").to("direct:doProcess");
    
    from("direct:doProcess").whatever..
    

    This way, you can easily turn on/off routes as well as assigning more consumers to one, given you need to have more priority on aaa.processQueue messages than the rest.


如果有人发送消息到aaa.processQueue或bbb.processQueue,我们如何区分消息是发送到aaa.processQueue还是bbb.processQueue? - ujjwal singh
那么,如果你需要将它们分开,为什么要使用通配符消费者呢? - Petter Nordlander
在我的应用程序中,我使用通配符,像这样:*.processQueue;但是在发送消息时,我将发送到{uniqueId}.processQueue。我需要在bean或处理器中获取那个uniqueId。我能做到吗? - ujjwal singh
这本质上是另一个问题,但您可以始终查看消息的JMSDestination属性。 - Petter Nordlander
他们的 Github 上有示例。 - joecoder

0
他们在他们的github上有一个使用通配符的路由构建器的示例。
protected RouteBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {
        public void configure() throws Exception {
            // use wildcard to consume from all sports
            from("activemq:queue:sport.>")
                .to("log:received?showHeaders=true")
                .choice()
                    // the JMSDestination contains from which queue the message was consumed from
                    .when(header("JMSDestination").isEqualTo("queue://sport.pl.chelsea"))
                        .to("mock:chelsea")
                    // we can use a reg exp to match any message from 1st division
                    .when(header("JMSDestination").regex("queue://sport.1st.*"))
                        .to("mock:1st")
                    .otherwise()
                        .to("mock:other")
                .end();
        }
    };
}

参考: https://github.com/apache/camel/blob/master/components/camel-jms/src/test/java/org/apache/camel/component/jms/activemq/ActiveMQConsumeWildcardQueuesTest.java


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接