Apache Camel中Seda的并发消费者

3

我有一个如下所示的路由。此路由会定期轮询目录并读取一个大型的 .csv 文件。然后将文件拆分成每个包含 1000 行的块,并将其发送到 seda 队列(firstQueue)。我在此 seda 队列上有 15 个并发消费者。

route.split().tokenize("\n", 1000).streaming().to("seda:firstQueue?concurrentConsumers=15").process(myProcessor).to("seda:secondQueue?concurrentConsumers=15").process(anotherMyProcessor);

1) 15并发消费者是什么意思 - 它是否意味着有15个线程从seda读取数据并将其传递给一个myProcessor实例?或者会创建15个单独的myProcessor实例,每个都在作用于相同的数据副本上运行?请注意,myProcessor是一个单例,如果将其更改为原型,会发生什么。

2) 是否可能出现任何两个或多个线程选择相同的数据并将其传递给myProcessor?还是保证没有两个线程将有相同的数据?

感谢快速回复。谢谢!


我正在使用相当旧的camel版本-2.10.1。有人知道在这个版本中seda和concurrentConsumers是否存在问题,并且可能已经在后续版本中修复了吗? - prachi srivastava
https://stackoverflow.com/questions/58863298/how-apache-camel-seda-component-works - vks
1个回答

2

我的骆驼有点生锈,但我很确定:

  1. 有15个线程在运行。每个线程将从队列中读取一条消息并调用myProcessor。只有一个myProcessor实例,因此您需要确保它是线程安全的。我从未尝试过,但我不认为将其范围更改为原型会有任何区别。

  2. 两个线程不应从队列中选择相同的消息。在正常运行中,每条消息应仅处理一次。但是,存在可能导致同一消息被处理两次的错误条件,最明显的错误条件是在处理文件时部分重启应用程序。


谢谢马特的回复。所以,既然SEDA基本上是一个“阻塞队列”,那么可以确保没有两个线程会有相同的消息副本吗?另外,一旦一个线程从队列中取走一条消息,那么该消息会立即从队列中移除吗? - prachi srivastava
1
是的,阻塞队列就是这样的。一旦一个线程接收到一条消息,它会立即从队列中移除,所以其他线程不会再接收到相同的消息。 - matt helliwell
还有一个问题,虽然我定义了15个并发消费者(concurrentConsumer),但我只看到1个线程(由myProcessor)处理消息。你知道为什么会这样吗? - prachi srivastava
很抱歉,我脑袋里一时想不出来。 - matt helliwell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接