如何使用Python编程检查Kafka Broker是否正在运行

8
我想从Kafka主题中消费信息。我正在使用一个包装confluent_kafka消费者的工具。在开始消费消息之前,我需要检查是否已建立连接。
我读到消费者是“懒惰”的,所以我需要执行某些操作来建立连接。但我想在不进行consumepoll操作的情况下检查连接是否已建立。
此外,我尝试提供一些错误配置,以查看轮询的响应,但是我得到的响应是:
b'Broker: No more messages'

那么,我该如何确定连接参数是否有误、连接是否断开或者主题中确实没有消息?

1个回答

19

很抱歉,目前没有一种直接的方法来测试Kafka Brokers是否正在运行。另外请注意,如果消费者已经消费了消息,并不意味着这是一种糟糕的行为,显然也不能表明Kafka Broker已经宕机。


一个可能的解决方案是执行某种快速操作并查看Broker是否响应。例如,列出主题:

使用 confluent-kafka-pythonAdminClient

# Example using confuent_kafka
from confluent_kafka.admin import AdminClient

kafka_broker = {'bootstrap.servers': 'localhost:9092'}
admin_client = AdminClient(kafka_broker)
topics = admin_client.list_topics().topics

if not topics: 
    raise RuntimeError()

使用 kafka-pythonKafkaConsumer

# example using kafka-python
import kafka


consumer = kafka.KafkaConsumer(group_id='test', bootstrap_servers=['localhost:9092'])
topics = consumer.topics()

if not topics: 
    raise RuntimeError()

请自行决定是否使用kafka-python,该库多年未更新,可能与亚马逊MSK或confluent容器不兼容。


当生产者向主题写入时,我如何获取其状态以检查它是否正在忙于写入? - y_159
或者在生产者本身宕机的情况下呢? - y_159

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接