如何为消费者设置Kafka偏移量?

3
假设我的主题中已经有10个数据,现在我启动了 Flink 消费者,消费者将消费第11个数据。
因此,我有三个问题:
  1. 如何获取当前主题的分区数和每个分区的偏移量?
  2. 如何手动为每个分区设置消费者的起始位置?
  3. 如果 Flink 消费者崩溃,并且在几分钟后恢复,该如何知道从哪里重新开始消费?
任何帮助都将不胜感激。示例代码(我尝试过 FlinkKafkaConsumer08FlinkKafkaConsumer10但都会出现异常):
public class kafkaConsumer {
public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(5000);

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.95.2:9092");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "earliest");

    FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>(
            "game_event", new SimpleStringSchema(), properties);


    DataStream<String> stream = env.addSource(myConsumer);

    stream.map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Stream Value: " + value;
        }
    }).print();

    env.execute();
    }
}

以及pom.xml文件:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.6.1</version>
    </dependency>

你能解释一下你所说的“如何获取每个分区的分区数和偏移量”是什么意思吗?另外,你要找的是哪个偏移量?是最新的那个吗? - Giorgos Myrianthous
@GiorgosMyrianthous “如何获取分区数量”意味着获取当前主题的分区数量,以及每个分区的最新偏移量。 - user2894829
1个回答

2
2. 为了从特定偏移量开始消费分区的消息,您可以参考Flink文档

You can also specify the exact offsets the consumer should start from for each partition:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

The above example configures the consumer to start from the specified offsets for partitions 0, 1, and 2 of topic myTopic. The offset values should be the next record that the consumer should read for each partition. Note that if the consumer needs to read a partition which does not have a specified offset within the provided offsets map, it will fallback to the default group offsets behaviour (i.e. setStartFromGroupOffsets()) for that particular partition.

Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Kafka partition is determined by the offsets stored in the savepoint or checkpoint (please see the next section for information about checkpointing to enable fault tolerance for the consumer).

3. 如果其中一个消费者崩溃,恢复后Kafka将会参考`consumer_offsets`话题来从之前崩溃的点继续处理消息。`consumer_offsets`是一个用于存储有关每个三元组(主题、分区、组)的提交偏移量的元数据信息的话题。它还定期进行压缩,以便只保留最新的偏移量可用。您也可以参考Flink的Kafka Connectors指标
Flink的Kafka连接器通过Flink的度量系统提供一些指标,以分析连接器的行为。生产者通过Flink的度量系统导出所有支持版本的Kafka内部指标。消费者从Kafka版本0.9开始导出所有指标。Kafka文档列出了其文档中导出的所有指标。
除了这些指标之外,所有消费者都会公开每个主题分区的当前偏移量和已提交偏移量。当前偏移量是分区中的当前偏移量。这指的是我们成功检索并发射的最后一个元素的偏移量。已提交偏移量是最后提交的偏移量。
Flink中的Kafka消费者将偏移量提交回Zookeeper(Kafka 0.8)或Kafka代理(Kafka 0.9+)。如果禁用检查点,则定期提交偏移量。使用检查点时,只有在流式拓扑中的所有运算符都确认已创建其状态的检查点后,提交才会发生。这为用户提供了至少一次语义,以便将偏移量提交到Zookeeper或代理。对于提交到Flink的偏移量,系统提供了恰好一次的保证。
提交给ZK或代理的偏移量也可以用于跟踪Kafka消费者的读取进度。每个分区中已提交偏移量和最近偏移量之间的差称为消费者滞后。如果Flink拓扑从主题中消耗数据的速度比添加新数据的速度慢,则滞后会增加,消费者将落后。对于大型生产部署,我们建议监视该指标以避免增加延迟。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接