我正在使用 Kafka 2.12 和 kafka-python 模块作为 Kafka 客户端。我尝试测试一个简单的生产者:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092')
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
当此进程被实例化时,消息永远不会被消费者接收到。
如果我刷新生产者并更改linger_ms参数(使其同步),则消息将被发送并被消费者读取:
class Producer(Process):
daemon = True
def run(self):
producer = KafkaProducer(bootstrap_servers='kafka:9092', linger_ms=10)
print("Sending messages...")
producer.send('topic', json.dumps(message).encode('utf-8'))
producer.flush()
在之前的Kafka版本中,有一个参数queue.buffering.max.ms用于指定生产者等待将消息发送到队列的时间,但在最新版本(kafka-python 1.3.3)中已经不存在了。我该如何在更新的Kafka版本中指定此参数以保持我的通信异步呢?
谢谢!