如何检测处于僵尸状态的Kafka Streams应用程序

3
我们的Kafka Streams应用程序中,StreamThread消费者之一在生成以下日志消息后进入了僵尸状态:
[Consumer clientId=notification-processor-db9aa8a3-6c3b-453b-b8c8-106bf2fa257d-StreamThread-1-consumer, groupId=notification-processor] Member notification-processor-db9aa8a3-6c3b-453b-b8c8-106bf2fa257d-StreamThread-1-consumer-b2b9eac3-c374-43e2-bbc3-d9ee514a3c16 sending LeaveGroup request to coordinator****:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
看起来StreamThread的Kafka Consumer已经离开了Consumer Group,但是Kafka Streams应用程序仍然处于RUNNING状态而没有消费任何新记录。
我想检测到Kafka Streams应用程序进入这种僵尸状态,以便可以关闭它并使用新实例代替。通常,我们通过Kubernetes健康检查来验证Kafka Streams应用程序处于RUNNING或REPARTITIONING状态,但对于此情况无效。
因此,我有两个问题: 1. 当Kafka Streams应用程序没有活动的消费者时,它仍然处于RUNNING状态是否合理?如果是,为什么? 2. 我们如何检测(通过编程/指标)Kafka Streams应用程序进入了没有活动消费者的僵尸状态?
2个回答

1

如果没有活动的消费者,Kafka Streams应用程序是否会保持运行状态?如果是:为什么?

这取决于版本。在旧版本(2.1.x及更早版本)中,即使所有线程都死亡,Kafka Streams也确实会保持运行状态。通过https://issues.apache.org/jira/browse/KAFKA-7657v2.2.0中修复了此问题。

我们如何检测(通过编程/指标)Kafka Streams应用程序进入没有活动消费者的僵尸状态?

即使在旧版本中,您也可以在KafkaStreams客户端上注册未捕获的异常处理程序。每次StreamThreads死亡时都会调用此处理程序。

顺便说一下:在即将发布的2.6.0版本中,添加了一个新的度量标准alive-stream-threads以跟踪正在运行的线程数:https://issues.apache.org/jira/browse/KAFKA-9753


我们目前在我们的代理商和客户端中使用2.4.0版本,所以这可能是一个错误吗?不幸的是,我们还没有能够复制/找出是什么导致了消费者轮询超时,因为它非常非常罕见。感谢您提供有关跟踪死亡流线程的指针。我们期待着2.6.0版本的发布,并查看是否可以在此之前使用未捕获的异常处理程序。 - Pieter Hameete
还有一个后续问题:从我的日志中,我只知道属于流线程的消费者不再是消费者组的一部分。没有日志消息说明StreamThread已经死亡。这可能是它仍被视为运行的原因吗? - Pieter Hameete
我们目前在我们的代理和客户端中使用2.4.0版本,所以这可能是一个bug吗?——听起来象是。没有日志消息表明StreamThread已经死了。这可能可以解释;只要线程没有死亡,它应该尝试重新加入消费者组。所以也许问题不在“客户端状态跟踪”中,而是在StreamThread上,由于某种原因被卡住了…… - Matthias J. Sax
只是为了澄清,僵尸状态只意味着消费者已被踢出组,并且仍在等待“注意”这一事实,它并不意味着致命错误,因此不会触发未捕获的异常处理程序(也不应该)。同样,它仍然被认为是“活着的”,因此这不会,也不应该反映在alive-stream-threads指标中。有关详细信息,请参见我上面的回复。 - S Blee-G
哦,糟糕,我没有注意到这个问题是两年前的,我刚从用户邮件列表中跳转到这里,因为有人遇到了同样的问题。抱歉让这个问题又活过来了哈哈。 - S Blee-G

0

提醒一下,用户邮件列表上正在进行一场类似的讨论——主题是“kafka stream zombie state”。

我先告诉你们我在那里说了什么,因为目前的对话中似乎存在一些误解:基本上,错误消息有点误导人,因为它暗示这是由消费者本身记录并且它当前正在发送此LeaveGroup/已经注意到错过了轮询间隔。但实际上,当心跳线程注意到主要的消费者线程没有在最大轮询超时内轮询时,该消息实际上是由心跳线程记录的,并且技术上只是将其标记为“需要重新加入”,以便消费者知道在下一次轮询时发送此LeaveGroup。然而,如果消费者线程实际上被卡在用户/应用程序代码的某个地方,无法打破继续轮询循环,则消费者将永远不会触发重新平衡、尝试重新加入、发送LeaveGroup请求等。这就是为什么状态继续保持为RUNNING而不是REBALANCING的原因。

由于上述原因,像num-alive-stream-threads这样的指标也无法帮助,因为线程并没有死亡——它只是卡住了。实际上,即使线程变得不卡了,它也只会重新加入并像往常一样继续运行,它不会“死亡”(因为只有在遇到致命异常时才会发生这种情况)。
长话短说:代理和心跳线程已经注意到消费者不再在组内,但StreamThread可能卡在拓扑结构中的某个地方,因此消费者本身实际上并不知道自己已经被踢出消费者组。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接