我有一个订阅测试主题的消费者,其中一个生产者线程定期发布消息。我想能够阻塞消费者线程,直到有新消息到达 - 然后处理该消息并再次开始等待。最接近的方法是:
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
while True:
print(consumer.poll(timeout_ms=5000))
有没有更加地道的方式(或者我看不到的严重问题)?
我是kafka的新手,所以非常欢迎对这个设计提出一般性建议。下面是一个完整的(可运行的)示例:
import time
from threading import Thread
import kafka
from kafka import KafkaProducer, KafkaConsumer
print('python-kafka:', kafka.__version__)
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(str(key), encoding='utf-8')
value_bytes = bytes(str(value), encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
except Exception as ex:
print('Exception in publishing message\n', ex)
localhost_ = 'localhost:9092'
def kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=[localhost_],
api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
j = 0
while True:
publish_message(_producer, topic_name, value=j, key=j)
j += 1
time.sleep(5)
if __name__ == '__main__':
print('Running Producer..')
topic_name = 'test'
prod_thread = Thread(target=kafka_producer)
prod_thread.start()
consumer = KafkaConsumer(topic_name, auto_offset_reset='latest',
bootstrap_servers=[localhost_],
api_version=(0, 10), consumer_timeout_ms=1000)
# consumer.subscribe([topic_name])
while True:
print(consumer.poll(timeout_ms=5000))
python-kafka: 1.3.5
for msg in consumer:
是否会像这里所写的那样无限消费消息?如果不是,将其放在一个 while True 循环中是否可行? - undefinedconfluent_kafka
,而是使用poll()
方法的其他库。 - undefined