Java,如何在Apache Kafka中获取主题中的消息数量

131
我正在使用Apache Kafka进行消息传递。我已经在Java中实现了生产者和消费者。我们如何获取特定主题中的消息数量?

我的回答给出了一种实际的方法,而不仅仅是消耗消息:https://dev59.com/e14b5IYBdhLWcg3w5VE9#61916983 - LeYAUable
18个回答

5

我有同样的问题,这是我用Kotlin从KafkaConsumer中解决的方法:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()

这是一段非常初步的代码,因为我只是让它起作用了,但基本上您需要从主题的开始偏移量中减去结束偏移量,这将是主题的当前消息数。

您不能仅依赖于结束偏移量,因为其他配置(清理策略、保留时间等)可能会导致删除主题中旧的消息。偏移量只能“向前”移动,因此是开始偏移量向前接近结束偏移量(或最终到达相同值,如果主题当前不包含任何消息)。

基本上,结束偏移量表示通过该主题传递的所有消息的总数,两者之间的差异表示该主题当前包含的消息数。


3

Kafka文档摘要

0.9.0.0版本中的弃用功能

kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已被弃用。从现在开始,请使用kafka-consumer-groups.sh(kafka.admin.ConsumerGroupCommand)来执行此功能。

我正在运行启用了SSL的Kafka代理服务器和客户端。我使用以下命令:

kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x

其中,/tmp/ssl_config如下所示:

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password

3
在最近的Kafka Manager版本中,有一个名为Summed Recent Offsets的列。

enter image description here


1
如果您可以访问服务器的JMX接口,则起始和结束偏移量位于:
kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

您需要替换TOPICNAMEPARTITIONNUMBER。 请记住,您需要检查给定分区的每个副本,或者您需要找出哪个代理是给定分区的领导者(这可能会随时间而改变)。

另外,您可以使用Kafka Consumer方法beginningOffsetsendOffsets


让我看看我是否理解正确:启用JMX。获取所有指标。选择一个主题和一个分区。对于该主题/分区组合,获取LogEndOffset和LogStartOffset。做差。这就是队列中的消息数量。正确吗? - Florin Andrei
如果一个主题有多个分区,那么我需要为每个分区单独进行这个数学计算吗?然后将结果相加吗?(我是Kafka的新手,之前只用过RabbitMQ。) - Florin Andrei

0

获取准确数字的唯一方法是使用消费者读取消息。

或者,获取最接近的数字的方法(不太准确)是使用“kafka.tools.GetOffsetShell”类。在两个单独的 shell 命令中,使用时间参数(-1 表示最新,-2 表示最早)获取每个分区的最新和最早偏移量,然后使用一个简单的 shell 脚本来关联每个分区并相减得到数字。请参见下面的命令示例。但请注意,在某些罕见情况下,主题可能会缺少偏移量编号,这种情况下也无法使用此方法,例如压缩主题。

获取主题的最新/结束偏移量编号

bin % ./kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic tpmqtt --time -1

tpmqtt:0:8

tpmqtt:1:0

tpmqtt:2:10

获取主题的最早偏移量编号:

bin % ./kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic tpmqtt --time -2

tpmqtt:0:0

tpmqtt:1:0

tpmqtt:2:0

在这个例子中,对于值 "tpmqtt:2:10",tpmqtt 是主题,2 是分区号,10 是该分区中的最后偏移量。


0
如果您需要计算消费者组中所有消费者的结果(或不同消费者组的结果),另一个选项是使用管理客户端并从主题/分区偏移量减去消费者组偏移量,以下是 Kotlin 的代码示例:
val topicName = "someTopic"
val groupId = "theGroupId"
val admin = Admin.create(kafkaProps.buildAdminProperties()) // Spring KafkaProperties
val parts = admin.describeTopics(listOf(topicName)).values()[topicName]!!.get().partitions()
val topicPartitionOffsets = admin.listOffsets(parts.associate { TopicPartition(topicName, it.partition()) to OffsetSpec.latest() }).all().get()
val consumerGroupOffsets = admin.listConsumerGroupOffsets(groupId)
    .partitionsToOffsetAndMetadata().get()
val highWaterMark = topicPartitionOffsets.map { it.value.offset() }.sum()
val consumerPos = consumerGroupOffsets.map { it.value.offset() }.sum()
val unProcessedMessages = highWaterMark - consumerPos

这里也提供了一个可工作的版本,来自LeYAUable示例代码,仅使用一个普通(非管理员)客户端:
val partitions = consumer.partitionsFor("topicName")
        .map { TopicPartition(it.topic(), it.partition()) }
val highWaterMark = consumer.endOffsets(partitions).values.sum()
val consumerPosition = consumer.beginningOffsets(partitions).values.sum()
val msgCount = highWaterMark - consumerPosition

然而,这只会给您此特定消费者的偏移量!通常的警告适用于当主题被压缩时,此方法不够精确。


-1

我自己还没有尝试过this,但它似乎很有道理。

你也可以使用kafka.tools.ConsumerOffsetCheckersource)。


-1
我发现最简单的方法是使用Kafdrop REST API /topic/topicName,并在请求头中指定键:"Accept" / 值:"application/json",以便获取JSON响应。

这里有文档记录


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接