有没有一种方法可以在特定的偏移量停止Kafka消费者?

5

我可以跳转到特定的偏移量。是否有一种方法可以在特定偏移处停止消费者?换句话说,只消费我的给定偏移量之前的消息。据我所知,Kafka没有提供这样的功能。如果我错了,请纠正我。

例如,分区具有偏移量1-10。我只想从3-8消费。在消费第八条消息后,程序应该退出。

3个回答

2
是的,kafka本身并不提供此功能,但您可以在消费者代码中实现此功能。您可以尝试使用commitSync()来控制此操作。

public void commitSync(Map offsets)

提交指定主题和分区列表的指定偏移量。这将把偏移量提交到Kafka。使用此API提交的偏移量将在每次重新平衡后的第一次获取以及启动时使用。因此,如果您需要将偏移量存储在除Kafka之外的任何地方,则不应使用此API。提交的偏移量应是您的应用程序将要消耗的下一条消息,即lastProcessedMessageOffset + 1。

这是同步提交,将阻塞直到提交成功或遇到无法恢复的错误(在这种情况下,它将被抛出给调用者)。

类似于这样:
 while (goAhead) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         if (record.offset() > OFFSET_BOUND) {
            consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset())));
            goAhead = false;
            break;           
         }
         process(record);
     }
 }

在上面的代码中,你应该将"enable.auto.commit"设置为false。在你的情况下,OFFSET_BOUND可以设置为8。因为在你的例子中,提交的偏移量仅为9,所以下一次消费者将从这个位置开始获取。

1
它会表现不良。一旦分区达到边界,您就停止处理记录。但是可能会有其他分区 - 尚未达到边界 - 等待处理的记录。 - beatrice

0

0
假设分区偏移量是连续的(即未经日志压缩),您可以配置您的消费者(使用max.poll.records配置)以便在每次轮询中读取一定数量的记录。这将使您能够停止在您想要的偏移量处。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接