使用kafka python时无法传递消息

3

我正在尝试使用 kafka-python 库创建一个简单的 Kafka 应用程序。我尝试了一些在线示例,但似乎无法使它们正常工作。我在 Docker 容器中运行了一个 Kafka 实例,并测试了 shell 工具,该实例绝对有效。我能够发送和接收消息。我怀疑生产者的消息超时了。这里有两个版本的代码,基本上具有相同的行为:

import time
from kafka import SimpleProducer, KafkaClient
#  connect to Kafka
kafka = KafkaClient('localhost:9092')
producer = SimpleProducer(kafka)
# Assign a topic
topic = 'test'
producer.send_messages(topic, b'this is a message')

第二个版本:

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['0.0.0.0:9092'], api_version=(0,10))
topic = "test"

producer.send(topic, b'test message')

生产失败后,您看到了哪些日志? - William Hammond
2个回答

3

将以下代码行更改为: producer.send(topic, b'test message').get(timeout=30)(或任何您认为合适的值)

问题是在消息发送之前,生产者被终止,因为该方法是异步的。 如果添加以下内容,则可以自行查看:

import logging
logging.basicConfig(level=logging.INFO)

同时确保超时时间为0。


2
这取决于您运行docker的方式,但我认为您遇到的问题与您尝试连接的主机名有关。您需要指向在ADVERTISED_HOST环境变量中设置的主机。例如,当我像这样运行kafka-docker:docker run --hostname kafka-1 -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST='kafka-1' --env ADVERTISED_PORT=9092 spotify/kafka时,我就可以像这样发送消息到kafka。
from kafka import SimpleProducer, KafkaClient

kafka = KafkaClient('kafka-1:9092')
producer = SimpleProducer(kafka)
topic = 'test'
for i in range(100):
    producer.send_messages(topic, 'hullo-' + str(i))

此外,我需要将127.0.0.1 kafka-1添加到我的/etc/hosts文件中。完成后,我可以使用bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test --from-beginning命令消费由Python客户端生成的消息。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接