Java,如何在Apache Kafka中获取主题中的消息数量

131
我正在使用Apache Kafka进行消息传递。我已经在Java中实现了生产者和消费者。我们如何获取特定主题中的消息数量?

我的回答给出了一种实际的方法,而不仅仅是消耗消息:https://dev59.com/e14b5IYBdhLWcg3w5VE9#61916983 - LeYAUable
18个回答

128

这不是Java,但可能很有用

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list <broker>:<port> \
  --topic <topic-name> \
  | awk -F  ":" '{sum += $3} END {print sum}'

17
这句话的意思是“这不应该是每个分区最早和最晚偏移之差的总和吗?” “bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -1 | awk -F ":" '{sum += $3} END {print sum}'13818663 bash-4.3# $KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 10.35.25.95:32774 --topic test-topic --time -2 | awk -F ":" '{sum += $3} END {print sum}'12434609" 是获取 Kafka 主题中消息偏移量的命令。 而后面的问题是在询问:“然后差异返回主题中实际待处理的消息数?我理解的对吗?” - kisna
1
是的,没错。如果最早的偏移量不等于零,你必须计算差值。 - ssemichev
1
这正是我所想的 :)。 - kisna
1
有没有任何方法可以将其作为API使用并嵌入到代码(JAVA、Scala或Python)中? - salvob
1
简化@kisna的答案以获取确切的记录计数: brokers="broker1:port" topic=<topic-name> sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}') echo "主题${topic}中的记录数量为:"$((sum_1 - sum_2)) - spats
显示剩余4条评论

42

从消费者的角度来看,唯一想到的方法是实际消费消息并对其进行计数。

Kafka代理公开了自启动以来接收到的消息数量的JMX计数器,但您无法知道其中有多少已被清除。

在大多数常见情况下,Kafka中的消息最好视为无限流,并且获取当前保存在磁盘上的消息总数的离散值并不相关。此外,当处理一个主题中所有代理服务器的消息子集时,事情变得更加复杂。


4
请看我的答案 https://dev59.com/e14b5IYBdhLWcg3w5VE9#47313863 。Java Kafka客户端允许获取该信息。 - Christophe Quintard

35

由于不再支持ConsumerOffsetChecker,您可以使用此命令来检查主题中的所有消息:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

其中LAG是主题分区中消息的计数:

enter image description here

您还可以尝试使用 kafkacat 。 这是一个开源项目,可以帮助您从主题和分区中读取消息,并将其打印到标准输出。以下是一个示例,它从sample-kafka-topic主题读取最后10条消息,然后退出:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e

3
这个答案有点不够精确。LAG 是指等待消费者消费的消息数量,而不是分区中所有消息的总数。对于分区中消息总数来说,一个更准确但仍然有些误导的值是 LOG-END-OFFSET。 - Felipe Correa

23

我实际上用这个来测试我的 POC。你想要使用的项是 ConsumerOffsetChecker。你可以像下面这样使用 bash 脚本运行它。

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker  --topic test --zookeeper localhost:2181 --group testgroup

以下是结果: 在此输入图片描述 如红框所示,999是当前主题中的消息数量。

更新:ConsumerOffsetChecker从0.10.0开始已被废弃,您可能想要开始使用ConsumerGroupCommand。


2
请注意,ConsumerOffsetChecker已被弃用,并将在0.9.0之后的版本中删除。请改用ConsumerGroupCommand。(kafka.tools.ConsumerOffsetChecker$) - Szymon Sadło
2
是的,就是我说的。 - Rudy
你最后一句话不准确。上述命令在0.10.0.1版本中仍然有效,警告与我之前的评论相同。 - Szymon Sadło

19

有时候我们想要知道每个分区中消息的数量,比如在测试自定义分区器时。以下步骤已经过测试并适用于从Confluent 3.2获取的Kafka 0.10.2.1-2版本。假设有一个Kafka主题kt和以下命令行:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

打印样例输出,显示三个分区中消息的计数:

kt:2:6138
kt:1:6123
kt:0:6137

根据主题的分区数量,行数可能会增加或减少。


6
如果启用了日志压缩,那么将分区的偏移量相加可能无法精确计算主题中的消息数量。 - user7652554
这不是显示总消息数,而是显示最后的偏移量。初始值可以是任何数字。 - Anurag

11

请使用Facebook提供的超级SQL引擎PrestoDB,它可以连接多个数据源(如Cassandra、Kafka、JMX和Redis等)。

PrestoDB作为服务器运行,可选择使用工作节点(也有单独模式),然后您可以使用一个小的可执行JAR文件(称为presto CLI)进行查询。

在成功配置Presto服务器之后,您可以使用传统的SQL语句进行查询:

请访问https://prestodb.io/docs/current/connector/kafka-tutorial.html

SELECT count(*) FROM TOPIC_NAME;

这个工具很不错,但如果您的主题有超过2个点,则它将无法工作。 - armandfp

9

获取主题所有分区未处理消息的Apache Kafka命令:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

输出:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

第6列是未处理的消息。请像这样将它们相加:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

awk读取行,跳过表头并累加第6列,在结束时打印总和。

打印

5

8
运行以下命令(假设kafka-console-consumer.sh在路径上):
kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"

注意:我已删除 --new-consumer 选项,因为该选项不再可用(或明显不必要)。 - WestCoastProjects

7
使用Kafka 2.11-1.0.0的Java客户端,您可以执行以下操作:
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

输出结果大致如下:
offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13

我更喜欢你的回答,因为与@AutomatedMike的回答不同,你的回答不会干扰seekToEnd(..)seekToBeginning(..)方法,这些方法会改变consumer的状态。 - adaslaw

5

要获取存储在主题中的所有消息,您可以将消费者定位到每个分区流的开头和结尾,并对结果进行求和。

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());

4
顺便提一下,如果你开启了压缩功能,那么流中可能会存在间隙,因此实际的消息数量可能会比这里计算的总数要少。为了获得准确的总数,你需要重新播放消息并进行计数。 - AutomatedMike

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接