初次接触Kafka
我想知道是否有一种使用Java API获取kafka-metrics的方法。这里所说的kafka-metrics是指:
- 主题列表
- 每个主题分区中的消息数量(包括起始偏移量和结束偏移量)
- 代理列表
- 假设我使用0.9消费者API并让kafka管理我的消费者偏移量,每个消费者的偏移量
我意识到可能只有其中一些可用,并且它们可能通过不同的类或方法可用。这就是为什么我对每个要点进行了编号。
初次接触Kafka
我想知道是否有一种使用Java API获取kafka-metrics的方法。这里所说的kafka-metrics是指:
我意识到可能只有其中一些可用,并且它们可能通过不同的类或方法可用。这就是为什么我对每个要点进行了编号。
我正在使用Spring框架开发我的API。使用以下代码,您可以通过Java获取指标。
@Component
public class Receiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(Receiver.class);
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
public void testlag() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
.getListenerContainers()) {
Map<String, Map<MetricName, ? extends Metric>> metrics = messageListenerContainer.metrics();
metrics.forEach( (clientid, metricMap) ->{
System.out.println("------------------------For client id : "+clientid);
metricMap.forEach((metricName,metricValue)->{
//if(metricName.name().contains("lag"))
System.out.println("------------Metric name: "+metricName.name()+"-----------Metric value: "+metricValue.metricValue());
});
});
}
}
}
listTopics()
方法可以解决第一个问题。
KafkaConsumer的metrics()
方法可以为您提供该消费者的指标。
KafkaProducer的metrics()
方法可以为您提供该生产者的指标。
希望对您有所帮助!