监控Kafka主题的消费者数量。

3
我们正在使用Prometheus和Grafana监控我们的Kafka集群。
在我们的应用程序中,我们使用Kafka streams,由于异常的原因,可能会出现Kafka stream停止的情况。我们记录了事件setUnCaughtExceptionHandler,但是当流停止时,我们还需要某种类型的警报。
目前我们有一个运行为代理的jmx_exporter,并通过一个端点公开Kafka指标,Prometheus从该端点获取指标。
我们没有看到任何给出每个主题活动消费者数量的指标。我们是否遗漏了什么?有关如何获取活动消费者数量并在消费者停止时发送警报的建议。

你的消费者作为操作系统服务运行吗? - Giorgos Myrianthous
不,这是一个使用Kafka Streams的Java应用程序。 - Thiru
1
您可以将Java应用程序作为systemd服务运行,这样可以轻松监控并在失败时重新启动。 - Giorgos Myrianthous
我的服务不仅处理流,还处理其他事情。在出现异常的情况下,只有流会停止,其他部分会继续运行。 - Thiru
2
也许添加自定义度量指标可以有所帮助?https://docs.confluent.io/current/streams/monitoring.html#adding-your-own-metrics - Matthias J. Sax
谢谢Matthias的输入。有没有一种解决方案是在不编写代码的情况下实现呢? - Thiru
1个回答

3
我们有类似的需求,将Kafka消费者每个分区的滞后情况添加到Grafana中,并在滞后超过指定阈值时添加警报(每个主题的阈值应不同,取决于负载,例如对于某些主题可能为10,对于高负载可能为100000)。因此,如果您有超过1000条未处理的消息,您将收到警报。
您可以为每个Kafka流添加状态监听器,在流处于错误状态时记录错误或发送电子邮件:
kafkaStream.setStateListener((newState, oldState) -> {
    log.info("Kafka stream state changed [{}] >>>>> [{}]", oldState, newState);
    if (newState == KafkaStreams.State.ERROR || newState == KafkaStreams.State.PENDING_SHUTDOWN) {
        log.error("Kafka Stream is in [{}] state. Application should be restarted", newState);
    }
});

您还可以添加健康检查指示器(例如,通过REST端点或通过 spring-boot HealthIndicator),提供有关流是否正在运行的信息:

KafkaStreams.State streamState = kafkaStream.state(); state.isRunning();

我也没有找到任何Kafka Streams度量标准,提供有关活动消费者或可用连接分区的信息,但对我而言,如果Kafka Streams提供这些数据将会很好(并希望它们在未来版本中可用)。


谢谢你的回答,Vasiliy。我认为健康检查指示器对我来说是最好的选择。但仍然想知道,为什么这不是一个开箱即用的解决方案。 - Thiru
1
我猜这不是开箱即用的,因为Kafka Streams提供了当前状态信息和状态监听器,我们可以决定如何处理它,例如添加健康检查指示器(使用Spring Boot很容易实现),或在流未运行时发送电子邮件。我同意将来可以在spring-kafka中开箱即用地实现它 :) - Vasyl Sarzhynskyi
再次感谢Vasiliy。希望在spring-kafka中也能得到同样的支持。 - Thiru
如果您使用Spring Cloud Kafka Streams,则可以在健康状态页面上显示所有监听器KStream、KTable和GlobalKTable的健康状态。对于一些Kafka Streams出现故障的情况,它会显示状态为Down。 - R K

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接