我有同样的问题,这是我用Kotlin从KafkaConsumer中解决的方法:
val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
.map {
it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
}.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
.first()
这是一段非常初步的代码,因为我只是让它起作用了,但基本上您需要从主题的开始偏移量中减去结束偏移量,这将是主题的当前消息数。
您不能仅依赖于结束偏移量,因为其他配置(清理策略、保留时间等)可能会导致删除主题中旧的消息。偏移量只能“向前”移动,因此是开始偏移量向前接近结束偏移量(或最终到达相同值,如果主题当前不包含任何消息)。
基本上,结束偏移量表示通过该主题传递的所有消息的总数,两者之间的差异表示该主题当前包含的消息数。
Kafka文档摘要
0.9.0.0版本中的弃用功能
kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已被弃用。从现在开始,请使用kafka-consumer-groups.sh(kafka.admin.ConsumerGroupCommand)来执行此功能。
我正在运行启用了SSL的Kafka代理服务器和客户端。我使用以下命令:
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config
kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x
其中,/tmp/ssl_config如下所示:
security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
您需要替换TOPICNAME
和PARTITIONNUMBER
。
请记住,您需要检查给定分区的每个副本,或者您需要找出哪个代理是给定分区的领导者(这可能会随时间而改变)。
另外,您可以使用Kafka Consumer方法beginningOffsets
和endOffsets
。
获取准确数字的唯一方法是使用消费者读取消息。
或者,获取最接近的数字的方法(不太准确)是使用“kafka.tools.GetOffsetShell”类。在两个单独的 shell 命令中,使用时间参数(-1 表示最新,-2 表示最早)获取每个分区的最新和最早偏移量,然后使用一个简单的 shell 脚本来关联每个分区并相减得到数字。请参见下面的命令示例。但请注意,在某些罕见情况下,主题可能会缺少偏移量编号,这种情况下也无法使用此方法,例如压缩主题。
获取主题的最新/结束偏移量编号
bin % ./kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic tpmqtt --time -1
tpmqtt:0:8
tpmqtt:1:0
tpmqtt:2:10
获取主题的最早偏移量编号:
bin % ./kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic tpmqtt --time -2
tpmqtt:0:0
tpmqtt:1:0
tpmqtt:2:0
在这个例子中,对于值 "tpmqtt:2:10",tpmqtt 是主题,2 是分区号,10 是该分区中的最后偏移量。
val topicName = "someTopic"
val groupId = "theGroupId"
val admin = Admin.create(kafkaProps.buildAdminProperties()) // Spring KafkaProperties
val parts = admin.describeTopics(listOf(topicName)).values()[topicName]!!.get().partitions()
val topicPartitionOffsets = admin.listOffsets(parts.associate { TopicPartition(topicName, it.partition()) to OffsetSpec.latest() }).all().get()
val consumerGroupOffsets = admin.listConsumerGroupOffsets(groupId)
.partitionsToOffsetAndMetadata().get()
val highWaterMark = topicPartitionOffsets.map { it.value.offset() }.sum()
val consumerPos = consumerGroupOffsets.map { it.value.offset() }.sum()
val unProcessedMessages = highWaterMark - consumerPos
val partitions = consumer.partitionsFor("topicName")
.map { TopicPartition(it.topic(), it.partition()) }
val highWaterMark = consumer.endOffsets(partitions).values.sum()
val consumerPosition = consumer.beginningOffsets(partitions).values.sum()
val msgCount = highWaterMark - consumerPosition
然而,这只会给您此特定消费者的偏移量!通常的警告适用于当主题被压缩时,此方法不够精确。
/topic/topicName
,并在请求头中指定键:"Accept"
/ 值:"application/json"
,以便获取JSON响应。