这不是Java,但可能很有用
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list <broker>:<port> \
--topic <topic-name> \
| awk -F ":" '{sum += $3} END {print sum}'
从消费者的角度来看,唯一想到的方法是实际消费消息并对其进行计数。
Kafka代理公开了自启动以来接收到的消息数量的JMX计数器,但您无法知道其中有多少已被清除。
在大多数常见情况下,Kafka中的消息最好视为无限流,并且获取当前保存在磁盘上的消息总数的离散值并不相关。此外,当处理一个主题中所有代理服务器的消息子集时,事情变得更加复杂。
由于不再支持ConsumerOffsetChecker
,您可以使用此命令来检查主题中的所有消息:
bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
--group my-group \
--bootstrap-server localhost:9092 \
--describe
其中LAG
是主题分区中消息的计数:
您还可以尝试使用 kafkacat 。 这是一个开源项目,可以帮助您从主题和分区中读取消息,并将其打印到标准输出。以下是一个示例,它从sample-kafka-topic
主题读取最后10条消息,然后退出:
kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
有时候我们想要知道每个分区中消息的数量,比如在测试自定义分区器时。以下步骤已经过测试并适用于从Confluent 3.2获取的Kafka 0.10.2.1-2版本。假设有一个Kafka主题kt
和以下命令行:
$ kafka-run-class kafka.tools.GetOffsetShell \
--broker-list host01:9092,host02:9092,host02:9092 --topic kt
打印样例输出,显示三个分区中消息的计数:
kt:2:6138
kt:1:6123
kt:0:6137
根据主题的分区数量,行数可能会增加或减少。
请使用Facebook提供的超级SQL引擎PrestoDB,它可以连接多个数据源(如Cassandra、Kafka、JMX和Redis等)。
PrestoDB作为服务器运行,可选择使用工作节点(也有单独模式),然后您可以使用一个小的可执行JAR文件(称为presto CLI)进行查询。
在成功配置Presto服务器之后,您可以使用传统的SQL语句进行查询:
请访问https://prestodb.io/docs/current/connector/kafka-tutorial.html
SELECT count(*) FROM TOPIC_NAME;
获取主题所有分区未处理消息的Apache Kafka命令:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group
输出:
Group Topic Pid Offset logSize Lag Owner
test_group test 0 11051 11053 2 none
test_group test 1 10810 10812 2 none
test_group test 2 11027 11028 1 none
第6列是未处理的消息。请像这样将它们相加:
kafka-run-class kafka.tools.ConsumerOffsetChecker
--topic test --zookeeper localhost:2181
--group test_group 2>/dev/null | awk 'NR>1 {sum += $6}
END {print sum}'
awk读取行,跳过表头并累加第6列,在结束时打印总和。
打印
5
kafka-console-consumer.sh
在路径上):kafka-console-consumer.sh --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
--new-consumer
选项,因为该选项不再可用(或明显不必要)。 - WestCoastProjects KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
// after each message, query the number of messages of the topic
Set<TopicPartition> partitions = consumer.assignment();
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
for(TopicPartition partition : offsets.keySet()) {
System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
}
}
}
offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
seekToEnd(..)
和seekToBeginning(..)
方法,这些方法会改变consumer
的状态。 - adaslaw要获取存储在主题中的所有消息,您可以将消费者定位到每个分区流的开头和结尾,并对结果进行求和。
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
.map(p -> new TopicPartition(topic, p.partition()))
.collect(Collectors.toList());
consumer.assign(partitions);
consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
.collect(Collectors.toMap(Function.identity(), consumer::position));
consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());