Kafka的produce.send从未发送消息

8

我正在使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我尝试测试一个简单的生产者:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092')
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))

当此进程被实例化时,消息永远不会被消费者接收到。

如果我刷新生产者并更改linger_ms参数(使其同步),则消息将被发送并被消费者读取:

class Producer(Process):
daemon = True
def run(self):
    producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
    print("Sending messages...")
    producer.send('topic', json.dumps(message).encode('utf-8'))
    producer.flush()

在之前的Kafka版本中,有一个参数queue.buffering.max.ms用于指定生产者等待将消息发送到队列的时间,但在最新版本(kafka-python 1.3.3)中已经不存在了。我该如何在更新的Kafka版本中指定此参数以保持我的通信异步呢?
谢谢!
3个回答

16
如你所观察到的,消息被排队以进行异步发送,并且不能保证立即发送。因此,如果您想强制将消息发送到代理,则需要显式调用producer.flush(),它会一直阻塞直到消息被发送(尽管flush()不能保证acks)。
注意:由于flush()是一个阻塞调用,因此通常仅建议在低吞吐量系统或应用程序关闭时使用。同步发送与异步发送的吞吐量冲击通常对于高容量系统来说是不可行的。我的经验是,生产者通常很快地发送而不需要调用flush(),除非是在测试套件/开发中需要立即发生的情况。
我相当确定参数queue.buffering.max.ms已被linger_ms取代:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer 因此,在您的工作示例中,您已经使用了该参数。

哇,谢谢。在 https://kafka-python.readthedocs.io/en/master/usage.html 上有十几个不同的发送示例,但只有一个 .flush() 语句在底部,很容易被忽视,真是相当恼人。 - Jeff Ellen
1
确实,对于刚接触Kafka的人来说可能会感到困惑。它是开源的,所以我相信他们会欢迎改进文档的PR。此外,在其他语言的kafka客户端库生态系统中,异步发送+flush()是最常见的API模型,因此了解Kafka但不了解Python的人可能会更少受到惊讶。 - Jeff Widman
你说得对,它是开源的,所以我不能抱怨。我早些时候一直在阅读confluent文档,忘记了我正在参考哪一个。刷新范式在Kafka之外也很常见,但我在文档或代码中找不到它的提及(除了底部)。 - Jeff Ellen

6
producer = KafkaProducer(bootstrap_servers='kafkaIp:kafkaPort')
producer.send("topic_name", b'Your string here')
producer.flush()

使用send和flush。

0

我们希望确保消息被迅速发送,因此我们添加了一个单独的线程,运行一个 while 循环,不做任何操作,只调用 producer.flush(timeout = 0.1) 并休眠 100 毫秒。

我们不想消除批处理的吞吐量优势,但我们也想确保在流量较低时,消息能够以最小的延迟(毫秒,而非分钟)进行处理。

* 我们正在使用 gevent。如果您使用普通的 threading,可能不需要休眠。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接