Kafka消费者自动提交是如何工作的?

30

我正在阅读这个:

自动提交最简单的提交偏移量的方法是允许消费者代替你进行提交。如果配置了enable.auto.commit=true,那么每五秒钟,消费者将提交客户端从poll()接收到的最大偏移量。五秒间隔是默认值,并通过设置auto.commit.interval.ms来控制。就像消费者中的其他所有内容一样,自动提交由轮询循环驱动。每次轮询时,消费者会检查是否到了提交的时间,如果是,它将提交上次轮询返回的偏移量。

也许我的英语不好,但我没有完全理解这个描述。

假设我使用默认间隔-5秒自动提交,并且每7秒进行一次轮询。在这种情况下,提交将每5秒或每7秒发生一次?

如果每3秒进行一次轮询,您能澄清行为吗?提交每5秒还是每6秒发生一次?
我已经阅读了这个

自动提交:您可以将auto.commit设置为true,并使用以毫秒为单位的值设置auto.commit.interval.ms属性。启用此功能后,Kafka消费者将在其poll()调用响应中提交接收到的最后一条消息的偏移量。在设置的auto.commit.interval.ms后台发出poll()调用。

而且这与答案相矛盾。

你能详细解释一下吗?

假设我有这样的图表:

0秒 - 轮询
4秒 - 轮询
8秒 - 轮询

偏移量将在何时提交,在何时提交哪个偏移量?

4个回答

27

自动提交检查会在每次轮询时调用,并检查经过的时间是否大于配置的时间。如果是,就会提交偏移量。

如果提交间隔为5秒,而轮询发生在7秒后,则提交仅会在7秒后发生。


1
第二个情况呢? - gstackoverflow
1
第二个案例将遵循相同的逻辑,对于第一次轮询,它不会提交,因为3 < 5,但在下一次轮询时,它将提交,因为6 > 5,并在提交后重置计数器,然后遵循相同的模式。 - Liju John
但是在这种情况下如何提交最后一次投票?我需要手动完成吗? - gstackoverflow
1
当您关闭消费者并启用自动提交时,它将在关闭消费者之前提交偏移量。 - Liju John

16
它将尝试在轮询完成后尽快自动提交。您可以查看消费者协调器的源代码,该协调器在类级别上定义了一组本地字段,以了解是否启用了自动提交,间隔是多少,以及执行自动提交的下一个截止日期是什么。

https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L625

在poll中有一个地方调用了存储,具体位置为https://github.com/apache/kafka/blob/10cd98cc894b88c5d1e24fc54c66361ad9914df2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L279

话虽如此,例如每7秒执行一次poll,并且将自动提交设置为5:

0 - poll,+将截止时间设置为第5秒

7 - poll + 提交截止时间,将截止时间更新为7+5=12

14 - poll + 提交截止时间,将截止时间更新为12+5=17

但是,如果将轮询设置为每3秒,并且将自动提交设置为5:

0 - poll,+将截止时间设置为第5秒

3 - poll,没有提交

6 - poll + 提交截止时间,将截止时间更新为6+5=11


第一个截止日期只在第六秒发生吗? - gstackoverflow
@gstackoverflow 是的,根据kafka客户端的源代码。我知道这听起来很愚蠢,但从不同的角度来看,你想要实现什么?在关键任务系统中,最好在确信消息已处理后手动提交偏移量,在其他情况下,您可能更喜欢更少的偏移量提交以加快进程速度。然而,偏移提交并不是简单的,因为它涉及到zookeeper等方面。 - zubrabubra
@zubrabubra,这是否意味着它不一定会在每个通过“auto.commit.interval.ms”配置的间隔提交偏移量?我们还需要考虑经过的时间吗? - Nag
@Nag自动提交考虑了两个连续调用poll之间处理数据所需的时间和间隔。但是当它重新初始化截止日期时,正如您所看到的,它选择了来自poll请求开头的now。在这里,它选择了现在。 - zubrabubra

4
这是一个简单的代码,用于测试它的工作原理。
文档 -> https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
public class KafkaTest {
    
    public static final String KAFKA_TOPIC_NAME = "kafka-xx-test-topic";
    public static final String CONSUMER_GROUP_ID = "test-consumer-xx";
    public static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        final KafkaProducer<Object, Object> kafkaProducer = new KafkaProducer<>(getProps());
        for (int i = 0; i < 1000; i++) {
            kafkaProducer.send(new ProducerRecord<>(KAFKA_TOPIC_NAME, "Data_" + i));
        }
        final Consumer<Long, String> consumer = new KafkaConsumer<>(getProps());
        consumer.subscribe(Collections.singletonList(KAFKA_TOPIC_NAME));
        TopicPartition actualTopicPartition = new TopicPartition(KAFKA_TOPIC_NAME, 0);
        while (true) {
            final ConsumerRecords<Long, String> consumerRecords = consumer.poll(Duration.ofSeconds(60));
            consumerRecords.forEach(record -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
            });
            final long committedOffset = consumer.committed(Collections.singleton(actualTopicPartition)).get(actualTopicPartition).offset();
            final long consumerCurrentOffset = consumer.position(actualTopicPartition);
            System.out.println("Poll finish.. consumer-offset: " + consumerCurrentOffset + " - committed-offset: " + committedOffset + " " + LocalTime.now().format(DateTimeFormatter.ofPattern("HH:mm:ss")));
        }
    }

    private static Map<String, Object> getProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //  Default: latest
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // Default: true
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); // Default: 500
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Default: 5000
        return props;
    }
}

每2秒轮询一次
每5秒自动提交
输出如下:
Poll finish.. consumer-offset: 1010 - committed-offset: 1000 17:07:05
Poll finish.. consumer-offset: 1020 - committed-offset: 1000 17:07:07
Poll finish.. consumer-offset: 1030 - committed-offset: 1000 17:07:09
Poll finish.. consumer-offset: 1040 - committed-offset: 1030 17:07:11 -> commit when poll finish because of elapsed time(6 sec) > commit interval(5 sec)
Poll finish.. consumer-offset: 1050 - committed-offset: 1030 17:07:13
Poll finish.. consumer-offset: 1060 - committed-offset: 1030 17:07:15
Poll finish.. consumer-offset: 1070 - committed-offset: 1060 17:07:17 -> auto commit 
Poll finish.. consumer-offset: 1080 - committed-offset: 1060 17:07:19
Poll finish.. consumer-offset: 1090 - committed-offset: 1060 17:07:21
Poll finish.. consumer-offset: 1100 - committed-offset: 1090 17:07:23 -> auto commit 

1
消费者提交的偏移量为1020、1030,而生产者只生产了1000条消息,这是怎么回事?[来自代码] 这个主题是否有超过1000条的消息? - Nag
1
@Nag 我刚刚重新开始了 :) 这个话题有些旧的消息,不是很重要。 - divilipir
1
有道理,是的,这个问题并不重要,只是想问一下我是否遗漏了什么。 - Nag

0
请查看以下配置,它为Kafka消费者调优提供了另一种视角: 对于来自生产者的30条记录,如果在20秒之前消费者崩溃,则由于max-poll-interval和auto-commit-interval都设置为20秒,消费者将再次读取整个30条记录集。
 auto-commit-interval: 20000
      auto-offset-reset: latest
      max-poll-records: 10
      max-poll-interval-ms: 20000

但对于以下配置,其中每隔2秒钟会发生自动提交,并且如果消费者在任何时间点崩溃的时间超过2秒,则已提交给Kafka生产者的记录将不会再次被消费者读取。
 auto-commit-interval: 2000
      auto-offset-reset: latest
      max-poll-records: 10
      max-poll-interval-ms: 20000

此外,自动提交间隔始终优先于最大轮询间隔。如果由于某种奇怪的原因未进行自动提交,则在经过20秒的最大轮询间隔后,Kafka代理将得出消费者已停止的结论。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接