我正在尝试将一个大型CSV发送到kafka。基本结构是读取CSV的一行并将其与标题压缩。
a = dict(zip(header, line.split(",")
这将被转换成一个json格式,使用以下代码:
message = json.dumps(a)
我使用kafka-python库发送消息。
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)
使用PYSPARK,我轻松地从CSV文件中创建了一组消息的RDD。
sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)
messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
现在我想发送这些消息:我定义了一个函数。
def sendkafka(message):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
return producer.send_messages('topic',message)
然后我创建了一个新的RDD来发送消息。
sentRDD = messageRDD.map(lambda x: kafkasend(x))
然后我调用sentRDD.count()
这会开始处理并发送消息
不幸的是,这很慢。它每秒只能发送1000条消息。这是在一个由4个cpu和8gb内存组成的10节点群集上运行的。
相比之下,创建这些消息需要大约7秒钟,在一个包含1000万行csv文件的数据集上。文件大小约为2gb。
我认为问题在于在函数内部实例化了一个kafka生产者。但是,如果我不这样做,Spark会抱怨生产者不存在,即使我试图在全局定义它。
也许有人可以提供一些方法来解决这个问题。
谢谢。