Python Kafka:有没有一种方法可以阻止消费者在kafka主题上,直到发布新消息?

3
我有一个订阅测试主题的消费者,其中一个生产者线程定期发布消息。我想能够阻塞消费者线程,直到有新消息到达 - 然后处理该消息并再次开始等待。最接近的方法是:
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                         bootstrap_servers=[localhost_],
                         api_version=(0, 10), consumer_timeout_ms=1000)
while True:
    print(consumer.poll(timeout_ms=5000))

有没有更加地道的方式(或者我看不到的严重问题)?

我是kafka的新手,所以非常欢迎对这个设计提出一般性建议。下面是一个完整的(可运行的)示例:

import time
from threading import Thread

import kafka
from kafka import KafkaProducer, KafkaConsumer

print('python-kafka:', kafka.__version__)

def publish_message(producer_instance, topic_name, key, value):
    try:
        key_bytes = bytes(str(key), encoding='utf-8')
        value_bytes = bytes(str(value), encoding='utf-8')
        producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
        producer_instance.flush()
    except Exception as ex:
        print('Exception in publishing message\n', ex)

localhost_ = 'localhost:9092'

def kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=[localhost_],
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka')
        print(str(ex))
    j = 0
    while True:
        publish_message(_producer, topic_name, value=j, key=j)
        j += 1
        time.sleep(5)

if __name__ == '__main__':
    print('Running Producer..')
    topic_name = 'test'
    prod_thread = Thread(target=kafka_producer)
    prod_thread.start()
    consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
                             bootstrap_servers=[localhost_],
                             api_version=(0, 10), consumer_timeout_ms=1000)
    # consumer.subscribe([topic_name])
    while True:
        print(consumer.poll(timeout_ms=5000))

python-kafka: 1.3.5

1个回答

5

Kafka权威指南中也建议使用无限循环轮询的方式。以下是使用相同思路的Java代码摘自第4章 Kafka消费者:从Kafka读取数据

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        ...
    }
}

这很好地体现了Python中推荐使用库的方式。 kafka-python(详见两个Kafka客户端的故事的完整示例)
from kafka import KafkaConsumer
...
kafka_consumer = Consumer(
...
)
consumer.subscribe([topic])

running = True
while running:
    message = kafka_consumer.poll()
...
confluent-kafka-python(在Python程序员的Apache Kafka入门中查看完整示例)
from confluent_kafka import Consumer, KafkaError
...
c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    while True:
        msg = c.poll(0.1)
...

另一个紧密相关的问题可能是如何处理消息。

您的代码的这部分可能依赖于外部依赖项(数据库、远程服务、网络文件系统等),这可能导致处理尝试失败。

因此,实现重试逻辑可能是一个好主意,您可以在博客文章Apache Kafka中的消费者重试架构中找到关于如何实现的良好描述。


注意:这个问题是使用Python编程的。 - undefined
@cricket_007 同样的思路也适用于Python库,我也添加了对它们的引用。 - undefined
1
更多完整的示例可以在https://docs.confluent.io/current/clients/consumer.html#advanced-examples找到。 - undefined
1
@OneCricketeer 这行代码:for msg in consumer: 是否会像这里所写的那样无限消费消息?如果不是,将其放在一个 while True 循环中是否可行? - undefined
我只想知道是否可以在没有轮询的情况下完成。 - undefined
1
@y_159 是的,但你提供的库不是 confluent_kafka,而是使用 poll() 方法的其他库。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接