使用Python Spark将大型CSV文件发送到Kafka

7

我正在尝试将一个大型CSV发送到kafka。基本结构是读取CSV的一行并将其与标题压缩。

a = dict(zip(header, line.split(",")

这将被转换成一个json格式,使用以下代码:
message = json.dumps(a)

我使用kafka-python库发送消息。
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)

使用PYSPARK,我轻松地从CSV文件中创建了一组消息的RDD。
sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
    return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)

messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))

现在我想发送这些消息:我定义了一个函数。
def sendkafka(message):
  kafka = KafkaClient("localhost:9092")
  producer = SimpleProducer(kafka)
  return producer.send_messages('topic',message)

然后我创建了一个新的RDD来发送消息。
sentRDD = messageRDD.map(lambda x: kafkasend(x))

然后我调用sentRDD.count()

这会开始处理并发送消息

不幸的是,这很慢。它每秒只能发送1000条消息。这是在一个由4个cpu和8gb内存组成的10节点群集上运行的。

相比之下,创建这些消息需要大约7秒钟,在一个包含1000万行csv文件的数据集上。文件大小约为2gb。

我认为问题在于在函数内部实例化了一个kafka生产者。但是,如果我不这样做,Spark会抱怨生产者不存在,即使我试图在全局定义它。

也许有人可以提供一些方法来解决这个问题。

谢谢。

1个回答

9

您可以为每个分区创建一个单独的生产者,并使用 mapPartitionsforeachPartition

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('topic', message)

sentRDD = messageRDD.mapPartitions(sendkafka)

如果上述方法无法帮助您,您可以尝试使用异步生产者扩展它。
在Spark 2.x中,也可以使用Kafka数据源。您需要包括spark-sql-kafka jar文件,匹配Spark和Scala版本(这里分别是2.2.0和2.11):
spark.jars.packages  org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

将数据转换成 DataFrame (如果它不是DataFrame

messageDF = spark.createDataFrame(messageRDD, "string")

并使用 DataFrameWriter 进行编写:

(messageDF.write
    .format("kafka")
    .option("topic", topic_name)
    .option("kafka.bootstrap.servers", bootstrap_servers)
    .save())

感谢zero323。使用异步I,我可以在Spark之外的单个生产者中获得每秒8000个。所以我进行了一些调整。我发现这个CSV有15个分区,所以我给了任务15个核心。然后我尝试了异步选项,直到批处理大小为20000。这使我最大吞吐量达到了225,000个/秒。因此,通过一些调整,我实际上获得了一个合理的速率。也就是说,流式传输1000万行CSV只需要45秒。 - Phineas Dashevsky
1
@PhineasDashevsky,如果您能分享最终解决方案的代码,那将非常有帮助。 - Picarus
在这篇文章中,我提供了代码和更详细的说明。 - Phineas Dashevsky
我尝试做这个,但是出现了NameError: global name 'KafkaClient' is not defined。我认为它无法在foreachpartition中识别kafkaclient。有人遇到过同样的问题吗? - Gayatri

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接