我正在运行Python 3.7中的代码,该库已通过安装。
pip3 install kafka
抱歉,我只能使用英文进行回答。
import random
import time, calendar
from random import randint
from kafka import KafkaProducer
from kafka import errors
from json import dumps
from time import sleep
def write_data(producer):
data_cnt = 20000
order_id = calendar.timegm(time.gmtime())
max_price = 100000
topic = "payment_msg"
for i in range(data_cnt):
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
rd = random.random()
order_id += 1
pay_amount = max_price * rd
pay_platform = 0 if random.random() < 0.9 else 1
province_id = randint(0, 6)
cur_data = {"createTime": ts, "orderId": order_id, "payAmount": pay_amount, "payPlatform": pay_platform, "provinceId": province_id}
producer.send(topic, value=cur_data)
sleep(0.5)
def create_producer():
print("Connecting to Kafka brokers")
for i in range(0, 6):
try:
producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
print("Connected to Kafka")
return producer
except errors.NoBrokersAvailable:
print("Waiting for brokers to become available")
sleep(10)
raise RuntimeError("Failed to connect to brokers within 60 seconds")
if __name__ == '__main__':
producer = create_producer()
write_data(producer)
然而出现了一个错误
Traceback (most recent call last):
File "generate_source_data.py", line 22, in <module>
from kafka import KafkaProducer
File "/usr/local/lib/python3.7/site-packages/kafka/__init__.py", line 23, in <module>
from kafka.producer import KafkaProducer
File "/usr/local/lib/python3.7/site-packages/kafka/producer/__init__.py", line 4, in <module>
from .simple import SimpleProducer
File "/usr/local/lib/python3.7/site-packages/kafka/producer/simple.py", line 54
return '<SimpleProducer batch=%s>' % self.async
问题是什么?