Spring Kafka - 在幕后,Consumer.poll() 方法什么时候被调用?

5

我有一个Spring Boot应用程序,其中只有一个Kafka消费者。

我使用了DefaultKafkaConsumerFactory和默认的消费者配置。我有一个ConcurrentListenerContainerFactory,其中并发设置为1,并且我有一个使用@KafkaListener注解的方法。

我正在监听一个有3个分区的主题,并且我有3个这样的消费者部署在不同的应用程序中。因此,每个消费者都在监听一个分区。

假设在幕后调用了消费者的轮询,并获取了40条记录。那么每个记录是否按顺序提供给使用@KafkaListener注释的方法,即提供记录1,等待方法完成处理,提供记录2,等待方法完成处理,依此类推。

上述情况是否发生,还是对于每个获得的记录,都会创建一个单独的线程,并在单独的线程上调用方法,以便主线程不被阻塞,可以更快地轮询记录。

我还想更清楚地了解消息监听器容器和最终消息监听器是什么。

先行致谢。

2个回答

6

在1.3及以上版本中,每个消费者只有一个线程;下一个poll()是在上一次消息被监听器处理后执行的。

在早期版本中,存在两个线程,当监听器线程正在处理第一批消息时,会执行第二个(可能是第三个)轮询。这是为了避免由于慢速监听器而进行重新平衡。该线程模型非常复杂,我们必须在必要时暂停/恢复消费者。KIP-62修复了重新平衡问题,因此我们能够使用当前更简单的线程模型。


感谢您的回复。正如我所想的那样,但我只是想确认一下。当 Kafka 获取的记录需要大量处理时间时,我们遇到了问题。正如您所指出的 kip-62,在后台线程中发送心跳信号,但即使调用 poll() 也会有超时,如果默认情况下不进行一段时间(300000 毫秒)的调用,则消费者将死亡,并且由于我们使用自动提交,offset 不会被提交,同样的记录会再次被处理。 - Indraneel Bende
1
你应该调整 max.poll.recordsmax.poll.interval.ms,以便在处理轮询结果时监听器不会超过后者。 - Gary Russell

3
很好,这正是Apache Kafka的一个要点 - 确保在同一线程中按顺序处理来自同一分区的记录。因此,当您将您的主题分配给3个实例的3个分区时,每个实例都会获得自己的分区,并在单个线程中进行轮询。 KafkaMessageListenerContainer是一个基于事件驱动、自我控制的KafkaConsumer包装器。它确实在一个while (isRunning()) {循环中调用poll(),该循环被安排在一个TaskExecutor中。
this.listenerConsumerFuture = containerProperties
            .getConsumerTaskExecutor()
            .submitListenable(this.listenerConsumer);

它处理ConsumerRecords并调用监听器:

private void invokeListener(final ConsumerRecords<K, V> records) {
        if (this.isBatchListener) {
            invokeBatchListener(records);
        }
        else {
            invokeRecordListener(records);
        }
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接