Kafka 消费者未正确提交偏移量

6

我有一个Kafka消费者,它定义了以下属性:

session.timeout.ms = 60000
heartbeat.interval.ms = 6000

我们注意到大约有2000个消息的延迟,并且通过我们的应用程序日志发现同一条消息被消费者多次消费。此外,注意到一些消息需要大约10秒才能完全处理。我们怀疑消费者没有正确提交偏移量(或重复提交相同的旧偏移量),因此消费者多次获取了相同的消息。
为了解决这个问题,我们引入了几个新属性:
auto.commit.interval.ms=20000 //To ensure that commit is happening only after processing of message is completed
max.poll.records=10 //To make the consumer pick only 10 messages in one go

And, we set the concurrency to 1.

这解决了我们的问题。延迟开始减少,最终降为0。
但是,我仍然不清楚为什么一开始会出现这个问题。 据我了解,默认情况下:
enable.auto.commit = true
auto.commit.interval.ms=5000

理想情况下,消费者应该每隔5秒提交一次。如果消息在这个时间内没有完全处理,会发生什么?消费者提交的偏移量是什么?问题是否由于大型轮询记录大小(默认为500)而发生?
另外,关于轮询方法poll(),我读到:
"调用poll()会在后台以设置的auto.commit.interval.ms间隔自动提交偏移量"
因此,如果最初的poll()每5秒钟就会发生一次(默认的auto.commit.interval),为什么它没有提交最新的偏移量?因为消费者还没有完成处理吗?那么,它应该在下一次5秒钟时提交该偏移量。
可以有人回答这些问题并解释原始问题为什么会出现吗?
1个回答

10
如果你正在使用Spring for Apache Kafka,我们建议将enable.auto.commit设置为false,以便容器以更确定的方式提交偏移量(在每个记录或每批记录之后-默认情况下)。

最可能的问题是max.poll.interval.ms,默认值为5分钟。如果您的一批消息花费的时间超过了此时间,您会看到这种行为。您可以增加max.poll.interval.ms或像您所做的那样减少max.poll.records

关键是您必须在小于max.poll.interval.ms的时间内处理poll返回的记录。

另外,关于poll()方法,我读到:

在设定的auto.commit.interval.ms后,会在后台发出poll()调用。

这是错误的;自从KIP-62以来,在后台发送心跳信号,但不会调用poll()。


1
谢谢。这很有道理,因为问题只在我们通过cron向主题推送了1700条消息后才开始出现。在此之前,该主题从未同时拥有超过100/200条消息。 - Isha Aggarwal
1
@Gary Russell,如果下一个poll()超过5秒(比如说,处理时间超过5秒),kafka消费者会等待还是会发出poll方法?(我怀疑它不应该,它会等待,从而不遵守poll()方法,在这种情况下,提交将仅在处理当前poll()方法检索到的所有记录后发生)-从而与此语句相矛盾-"在设置的auto.commit.interval.ms中后台发出poll()调用。" - Nag
1
请勿在评论中提出补充问题。是的,这与该语句相矛盾,因为poll()不会在后台调用。自KIP-62以来,只有心跳包会在后台发送。当使用自动提交(不建议与Spring一起使用)时,如果时间间隔已过,则提交将仅在下一次调用poll()时发生;如果时间间隔尚未过去,则提交将等到后续的轮询。对于Spring,使用BATCH(默认)或RECORD AckMode意味着提交将更具确定性,与时间无关。我们现在默认将enable.auto.commit设置为false。 - Gary Russell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接