我正在尝试使用 kafka-python 库创建一个简单的 Kafka 应用程序。我尝试了一些在线示例,但似乎无法使它们正常工作。我在 Docker 容器中运行了一个 Kafka 实例,并测试了 shell 工具,该实例绝对有效。我能够发送和接收消息。我怀疑生产者的消息超时了。这里有两个版本的代码,基本上具有相同的行为:
import time
from kafka import SimpleProducer, KafkaClient
# connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'test'
producer.send_messages(topic, b'this is a message')
第二个版本:
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['0.0.0.0:9092'], api_version=(0,10))
topic = "test"
producer.send(topic, b'test message')