如何在不使用repartition和copyMerge的情况下合并Spark结果文件?

7
我使用下面的代码:

我使用下面的代码:

csv.saveAsTextFile(pathToResults, classOf[GzipCodec])

pathToResults目录中有许多文件,如part-0000、part-0001等。 我可以使用FileUtil.copyMerge(),但它非常慢,它会下载驱动程序上的所有文件,然后将它们上传到Hadoop。但是,FileUtil.copyMerge()比以下方法更快:

csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])

如何在不重新分区和使用FileUtil.copyMerge()的情况下合并Spark结果文件?

3个回答

8
很遗憾,在Spark中没有其他选项可以获得单个输出文件。您可以使用coalesce(1)而不是repartition(1),但是如果数据太大,Spark会将您的数据收集到单个内存分区中,这可能会导致OOM错误。另一个合并HDFS文件的选项可能是编写一个简单的MapReduce作业(或Pig作业或Hadoop Streaming作业),该作业将整个目录作为输入,并使用单个reducer生成单个输出文件。但请注意,使用MapReduce方法,所有数据首先都会被复制到reducer本地文件系统中,这可能会导致“空间不足”错误。
以下是关于相同主题的一些有用链接:

0

coalesce(1) 运行得非常好。如果您想运行此脚本,我还看到了 hadoop-streaming 选项,可以在运行时合并 HDFS 文件。

$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
                   -Dmapred.reduce.tasks=1 \
                   -input "/hdfs/input/dir" \
                   -output "/hdfs/output/dir" \
                   -mapper cat \
                   -reducer cat

0

我曾经有过完全相同的问题,不得不编写 pySpark 代码(调用 Hadoop API)来实现 copyMerge:

https://github.com/Tagar/stuff/blob/master/copyMerge.py

不幸的是,作为独立的 Hadoop API 调用的 copyMerge 将在 Hadoop 3.0 中弃用并删除。因此,此实现不依赖于 Hadoop 的 copyMerge(它重新实现了它)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接