如何编写Kafka消费者-单线程VS多线程

24

我已经编写了一个使用Spring Kafka的单个Kafka消费者,它从单个主题中读取数据,并且是消费者组的一部分。一旦消费了消息,它将执行所有下游操作并转到下一个消息偏移量。我将其打包为WAR文件,并且我的部署流程将其推送到单个实例。使用我的部署流程,我可以将此构件潜在地部署到部署池中的多个实例。

但是,当我想要多个消费者作为基础架构的一部分时,我无法理解以下内容:

  • 我实际上可以在我的部署池中定义多个实例,并使所有这些实例上运行此WAR。这意味着,它们都在监听相同的主题,都是同一消费者组的一部分,并且实际上将分区分配给它们自己。下游逻辑将像原来一样工作。对于我的用例,这很完美地解决了问题,但我不确定是否应该遵循这种最佳实践?

  • 在网上阅读时,我发现这里这里有资源,人们定义了一个单个的消费者线程,但在内部创建了多个工作线程。还有一些示例,我们可以定义多个执行下游逻辑的消费者线程。思考这些方法并将它们映射到部署环境,我们可以使用更少的机器实现与我的理论解决方案相同的结果。

就我个人而言,我认为我的解决方案简单且可扩展,但可能不是最优的,而第二种方法可能是最优的,但想知道您的经验、建议或任何其他应该考虑的度量标准/约束条件?此外,我认为使用我的理论解决方案,我实际上可以将基础机器设计为Kafka消费者。

虽然我知道,我没有发布任何代码,但请让我知道是否需要将此问题移动到另一个论坛。如果您需要特定的代码示例,我也可以提供,但在我的问题背景下,我认为它们并不重要。

2个回答

9

您现有的解决方案是最好的。将消息传递给另一个线程会导致偏移量管理问题。Spring Kafka 允许您在每个实例中运行多个线程,只要您有足够的分区。


您的建议实际上暗示了第二种解决方案。在我的现有解决方案中,我有一个单线程消费者,部署到多个实例。然而,使用Spring Kafka,如果我可以在单个WAR中轻松定义多个线程并将此WAR部署到多个实例,则可以优化我的现有解决方案。我相信您指的是使用ConcurrentKafkaListenerContainerFactory,并能够根据我的主题分区设置并发性。此外,由于Spring正在管理消费者线程,因此消费者线程管理的生命周期将更加清洁。 - user3842182
不,那是错误的解释;您的第二个要点说的是“人们正在定义单个消费者线程,但在内部创建多个工作线程”。使用并发容器时,每个消费者都有一个单独的Consumer实例(和线程)。 没有将任务交给“工作”线程的概念。 这相当于在每个WAR中有n个容器(实际上就是并发容器内发生的情况)。 - Gary Russell
@GaryRussell,我也正在使用执行器服务实现多线程消费者和线程池。我发现了一些奇怪的问题,例如在某些并行线程中,如果有一些失败和一些消息被成功消耗,则所有未成功解析的消息都会丢失(数据丢失),因为Kafka只理解并更新其自上次提交以来的偏移量,因此如果有任何新的提交,它将重置滞后,并视为如果直到该偏移量,所有内容都已处理(尽管有些被其他线程杀死),是否还有其他方法或情况仍然相同? - SOURAV KUMAR
这正是为什么你不应该这样做的原因。 - Gary Russell

2
如果你现在的方法有效,就坚持使用它。这是一种简单而优雅的方式。
只有在某些情况下不能增加分区数但需要更高级别的并行性时,才会采用第二种方法。但那样你就需要担心排序和竞争条件。如果你确实需要这样做,我建议使用 akka-stream-kafka 库,它提供了正确处理偏移量提交并以并行方式执行所需操作的工具,并将其合并回一个流中,保留原始顺序等功能。否则,自己去做这些事情容易出错。

谢谢提供信息!现在,由于我也能够定义主题分区,所以我可以谨慎地考虑容量方面的问题。但是,如果将来需要,我会记住这个工具的。 - user3842182

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接