使用Spark将CSV转换为parquet,保留分区。

4
我正在尝试使用Spark将一堆csv文件转换为parquet格式,有趣的是输入的csv文件已经通过目录进行了“分区”。 所有输入文件都具有相同的列集。 输入文件的结构如下:
/path/dir1/file1.csv
/path/dir1/file2.csv
/path/dir2/file3.csv
/path/dir3/file4.csv
/path/dir3/file5.csv
/path/dir3/file6.csv

我想用Spark读取这些文件,并将它们的数据写入到HDFS中的一个parquet表中,保留分区(按输入目录分区),每个分区只有一个输出文件。输出文件的结构应如下所示:
hdfs://path/dir=dir1/part-r-xxx.gz.parquet
hdfs://path/dir=dir2/part-r-yyy.gz.parquet
hdfs://path/dir=dir3/part-r-zzz.gz.parquet

目前我找到的最佳解决方案是循环遍历输入目录,加载csv文件到一个dataframe中,然后将dataframe写入到parquet表的目标分区中。 但这并不是高效的,因为每个分区只有一个输出文件,而将数据写入hdfs是一个单个任务,会阻塞循环。 我想知道如何在集群中实现最大的并行化(并且没有对数据进行分片)。谢谢!
2个回答

1
将输入目录重命名,将dirX更改为dir=dirX。然后执行:
spark.read.csv('/path/').coalesce(1).write.partitionBy('dir').parquet('output')

如果您无法重命名目录,则可以使用Hive Metastore。创建外部表并为每个目录创建一个分区。然后加载此表并使用上述模式进行重写。

这是Spark 2.0,对吧?我还在用1.6版本,并且正在使用Databricks CSV读取器。你确定这种方法可以让来自同一输入目录的所有CSV文件由同一个节点加载吗?否则,coalesce将会创建混洗。 - benoitdr
是的,使用1.X版本,您需要加载spark-csv包,这也应该可以工作。 - Mariusz
它可以与spark-csv包一起使用,但会创建混洗,因为它不强制执行跨输入目录的数据本地性。 - benoitdr
很奇怪,洗牌可能出现的唯一地方是合并(coalesce),而默认情况下合并不会洗牌:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.coalesce。但我只在本地模式下测试过...你认为什么会导致洗牌出现?数据本地性没有保留吗? - Mariusz
我的错,coalesce(1)不会洗牌,但它会使整个过程变成单线程。所以这需要很长时间...与其使用coalesce,repartition('dir')是更好的选择。它会创建洗牌,但仍然比coalesce(1)更快。 - benoitdr

0

目前我找到的最佳解决方案(无需洗牌,输入目录数量与线程数相同):

  • 创建一个输入目录的RDD,分区数与输入目录相同

  • 将其转换为输入文件的RDD(按目录保留分区)

  • 使用自定义CSV解析器进行Flat-map操作

  • 将RDD转换为DataFrame

  • 将DataFrame写入以目录为分区的Parquet表中

需要编写自己的解析器。我无法找到使用sc.textfile或databricks csv解析器来保留分区的解决方案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接