使用队列的Camel线程DSL行为

3

以下是我期望的路由,我希望从 queue1 中同时处理10条消息,但事实上只有一条消息在同一时间被处理。

我是期望错了吗?还是我的做法有误?

context.addRoutes(new RouteBuilder() {
        public void configure() {                                       
            from("test-jms:queue:test.queue1").threads(10)
            .process(sleep(1)); // sleep id is 1                
        }

        private Processor sleep(final int sleepId) {
            return new Processor() {                    
                @Override
                public void process(Exchange exchange) throws Exception {                       
                    System.out.println(curTime() + " Going for sleep sleepid=" + sleepId );
                    Thread.sleep(5000l);                        
                    System.out.println(curTime() + " Done sleep sleepid=" + sleepId );
                }
            };
        }

使用以下方式调用上述路由:
   ExecutorService ec = Executors.newFixedThreadPool(5);

    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));

static class Task  implements Runnable{
    CamelContext context;
    ProducerTemplate template;
    public Task(CamelContext context, ProducerTemplate template) {
        super();
        this.context = context;
        this.template = template;
    }
    @Override
    public void run() {         
           Exchange exchange = new DefaultExchange(context);
           exchange.setPattern(ExchangePattern.InOnly);
           exchange.getIn().setBody("Test Message: " + Thread.currentThread().getName());
           System.out.println(Thread.currentThread().getName());
           Exchange send = template.send("test-jms:queue:test.queue1",exchange);
           System.out.println("completed");           
    }

}

代码输出结果:
10:24:11 Going for sleep sleepid=1
10:24:16 Done sleep sleepid=1

10:24:16 Going for sleep sleepid=1
10:24:21 Done sleep sleepid=1

10:24:21 Going for sleep sleepid=1
10:24:26 Done sleep sleepid=1

10:24:26 Going for sleep sleepid=1
10:24:31 Done sleep sleepid=1

10:24:31 Going for sleep sleepid=1
10:24:36 Done sleep sleepid=1

如果我们观察时间戳,我们会发现路由器一次只处理一个消息。

http://camel.apache.org/jms.html 提供了一个名为“concurrentConsumers”的设置...这是否表明JMS端点不支持线程DSL?即使我使用INONLY交换模式... - Bhuvan
如果我理解正确的话,您想让它同时处理所有10条消息并进入睡眠状态,对吗? - mirvine
最好在JMS端点中设置concurrentConsumers=10,这样就不需要线程了。 - Claus Ibsen
Claus Ibsen...上述情况是为了我学习Camel而设定的,所以可以说它取决于终端点支持哪些DSL吗?如果是的话,有没有相关文档记录...只是好奇为什么JMS终端点不支持线程DSL。 - Bhuvan
@ClausIbsen 请阅读上面的评论。 - Bhuvan
显示剩余2条评论
1个回答

4

您需要在JMS端点上启用asyncConsumer以使其异步化。这样做之后,可以无序处理从队列中消耗的消息,因此默认情况下消费者是有序的。

代码应该是:

 public void configure() {                                       
            from("test-jms:queue:test.queue1?asyncConsumer=true").threads(10)
            .process(sleep(1)); // sleep id is 1                
        }

但是JMS组件具有内置的并发性,通常最好使用它,因为它可以使用并发JMS消费者和并发网络。有关更多详细信息,请参见选项concurrentConsumersmaxConcurrentConsumers


这是否意味着可以说它取决于终端点是否支持特定的DSL... 如果是这样,那么文档没有清楚记录哪个终端点支持哪个DSL。 - Bhuvan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接