kafka最早和最新的偏移值有什么区别?

58

生产者发送了消息1、2、3、4

消费者接收到了消息1、2、3、4

消费者崩溃/断开连接

生产者发送了消息5、6、7

消费者重新运行后,应该从5开始接收消息,而不是从7开始

对于这种情况,我需要使用哪个offset值以及需要进行哪些其他更改/配置?

2个回答

85
当消费者加入一个消费者组时,它会获取最后提交的偏移量,因此如果在崩溃之前提交了最新的偏移量(如4),它将重新从5、6、7开始读取。
当消费者启动但没有为分配的分区提交偏移量时,auto.offset.reset属性的earliestlatest值被使用。 在这种情况下,您可以选择是从头开始重新读取所有消息(earliest),还是从最后一条消息之后开始读取(latest)。

“生产者”不断发送消息……在停止“消费者”之前,我检查了偏移值,它是8023。10分钟后,我启动了“消费者”,然后第一个偏移值是8020。过了一段时间,我再次停止了“消费者”,此时偏移值为9239,在一个小时后,我启动了“消费者”,然后第一个消息的偏移值是9299。 我设置了“groupId”,并且“auto.offset.reset”是“latest”。 我还记录了“分区”值,它只是“0”。 - Sat
1
所以如果将其设置为“latest”,它将读取7。在提交了7之后,它是否会接着读取6和5呢?或者如果有一连串具有更高优先级的新记录流进来,它们是否存在无法被处理的情况? - Yoker
1
当您提交偏移量时,这意味着您已经读取了所有先前的消息。因此,提交7意味着接下来您将不会读取6和5,而是由生产者发送的新到来的消息8。 - ppatierno
2
我认为@ppatierno没有回答这个问题。对于Sat的问题,auto.offset.reset的值应该是latest。当auto.offset.reset设置为latest时,会发生两种情况:当消费者第一次订阅主题时,它只会接收到订阅后到达的消息。另一种情况是当消费者重新连接到主题(在崩溃或其他情况下),消费者将接收到消息5、6、7,因为最新提交是4。 - Esca Tran
对于@Yoker的问题:消息的顺序是不可变的。消费者将按照这个顺序接收消息:5、6、7。 - Esca Tran
1
@EscaTran 答案是正确的。https://docs.confluent.io/current/clients/consumer.html#:~:text=Second%2C%20use%20auto.,%E2%80%9D%20offset%20(the%20default). "消费者从协调器接收到分配后,必须确定每个分配的分区的初始位置。当组首次创建时,在消费任何消息之前,根据可配置的偏移重置策略(auto.offset.reset)设置位置。通常,消费要么从最早的偏移开始,要么从最新的偏移开始。" - Kumar Sambhav

-1
为了清楚地了解这种情况,我们需要了解消费者加入同一消费者组时会发生什么。
  1. 加入消费者组,触发重新平衡并将分区分配给新的消费者。
  2. 查找已分配给消费者的分区的提交偏移量。
  3. 检查auto.offset.reset配置参数,以决定从哪里开始消费消息。
我们可以为auto.offset.reset配置设置两个值。
i. earliest - 从它之前停止消费的点开始消费。(根据您的示例从5开始)
ii. latest - 从分配分区中最新的偏移量开始消费。 (根据您的示例从7开始)

2
如果下面的被接受的答案是正确的,那么这个答案不完全错误吗? - kaashmonee
2
这是不正确的。只有在没有有效的偏移量可用时,此设置才会生效。 - rosejn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接