将Spark DataFrame的内容保存为单个CSV文件

32

假设我有一个Spark DataFrame,我想将其保存为CSV文件。在Spark 2.0.0之后,DataFrameWriter类直接支持将其保存为CSV文件。

默认行为是将输出保存在提供的路径内的多个part-*.csv文件中。

如何使用以下方式保存DF:

  1. 将路径映射到确切的文件名而不是文件夹
  2. 首行包含标题
  3. 另存为单个文件而不是多个文件。

处理此问题的一种方法是合并DF,然后保存文件。

df.coalesce(1).write.option("header", "true").csv("sample_file.csv")

然而,这种方法在收集数据时存在缺点,并且需要具备足够内存的主节点。

如果不使用 coalesce ,是否可以编写一个单一的CSV文件?如果不行,是否有比以上代码更有效的方法?


你解决了这个问题吗? - pietrop
3
如果文件很大,您担心主节点的内存不足,那么使用分段文件似乎更好。特别是对于进一步的分析,只有一个文件会忽略HDFS的优点。我没有看到除了 coalesce(1)repartition(1) 之外的spark方法。如果您想让多个工作人员附加到同一个文件,则它们必须顺序执行或等待彼此完成,否则记录将无序,这将难以编排。 - Davos
1
出于某种原因,即使使用'.coalesce(1)'或'.repartition(1)',我仍然得到一个文件夹作为输出,而不是单个文本文件或CSV。 - ukbaz
@ukbaz 这是默认行为。您将拥有一个文件夹,里面会有一个名为 part-* 的单个文件。 - Spandan Brahmbhatt
3
我不理解为什么这个问题一直被问到。它完全没有理解Spark和分布式计算的关键点。 - user4601931
显示剩余7条评论
8个回答

23

我刚刚使用pyspark和dbutils自己解决了这个问题,获取了.csv文件并将其重命名为所需的文件名。

save_location= "s3a://landing-bucket-test/export/"+year
csv_location = save_location+"temp.folder"
file_location = save_location+'export.csv'

df.repartition(1).write.csv(path=csv_location, mode="append", header="true")

file = dbutils.fs.ls(csv_location)[-1].path
dbutils.fs.cp(file, file_location)
dbutils.fs.rm(csv_location, recurse=True)

如果您只处理较小的文件并可以使用repartition(1)或coalesce(1),则通过不使用[-1]来改善此答案,但是.csv似乎总是出现在文件夹中的最后一个。这是一种简单快速的解决方案。


我更喜欢这个答案,但我不认为它适用于没有Databricks的原始Spark环境。 - undefined

14

7
不总是建议使用 toPandas,因为它需要将整个数据集都放在一个节点上。 - Rakesh K
3
不错的选择,但无法处理大型数据集! - DrDEE

5
df.coalesce(1).write.option("inferSchema","true").csv("/newFolder",header = 
'true',dateFormat = "yyyy-MM-dd HH:mm:ss")

4
欢迎来到Stackoverflow,您的代码可能有助于回答问题,但如果您不解释答案代码的实质,那就像是一个低质量的答案。请查看如何回答问题的指南。 - Mohammad Kanan

3

以下的scala方法适用于本地或客户端模式,并将df写入所选择的单个csv文件。它要求df适合内存,否则collect()会崩溃。

import org.apache.hadoop.fs.{FileSystem, Path}

val SPARK_WRITE_LOCATION = some_directory
val SPARKSESSION = org.apache.spark.sql.SparkSession

def saveResults(results : DataFrame, filename: String) {
    var fs = FileSystem.get(this.SPARKSESSION.sparkContext.hadoopConfiguration)
    
    if (SPARKSESSION.conf.get("spark.master").toString.contains("local")) {
      fs = FileSystem.getLocal(new conf.Configuration())
    }
    
    val tempWritePath = new Path(SPARK_WRITE_LOCATION)
    
    if (fs.exists(tempWritePath)) {
    
      val x = fs.delete(new Path(SPARK_WRITE_LOCATION), true)
      assert(x)
    }
    
    if (results.count > 0) {
      val hadoopFilepath = new Path(SPARK_WRITE_LOCATION, filename)
      val writeStream = fs.create(hadoopFilepath, true)
      val bw = new BufferedWriter( new OutputStreamWriter( writeStream, "UTF-8" ) )
    
      val x = results.collect()
      for (row : Row <- x) {
        val rowString = row.mkString(start = "", sep = ",", end="\n")
        bw.write(rowString)
      }
    
      bw.close()
      writeStream.close()
    
      val resultsWritePath = new Path(WRITE_DIRECTORY, filename)
    
      if (fs.exists(resultsWritePath)) {
        fs.delete(resultsWritePath, true)
      }
      fs.copyToLocalFile(false, hadoopFilepath, resultsWritePath, true)
    } else {
      System.exit(-1)
    }
}


SPARKSESSION = this_spark_session 是什么意思?为什么这两个变量没有像“val SPARK_WRITE_LOCATION”那样命名? - Petr Fedosov

2

这个解决方案基于Shell脚本,没有并行化,但在SSD上仍然非常快。它使用Unix系统上的cat和输出重定向。假设包含分区的CSV目录位于/my/csv/dir,输出文件为/my/csv/output.csv

#!/bin/bash
echo "col1,col2,col3" > /my/csv/output.csv
for i in /my/csv/dir/*.csv ; do
    echo "Processing $i"
    cat $i >> /my/csv/output.csv
    rm $i
done
echo "Done"

在将每个分区附加到最终CSV后,它将删除每个分区以释放空间。

"col1,col2,col3"是CSV文件的标题(这里有三列名称为col1col2col3)。您必须告诉Spark不要在每个分区中放置标题(可以使用.option("header", "false")来实现此目的,因为Shell脚本会处理它)。


2

如果您仍然想要这样做,以下是我在使用Spark 2.1和一些java.nio.file帮助的Scala中完成此操作的方法。

基于https://fullstackml.com/how-to-export-data-frame-from-apache-spark-3215274ee9d6

val df: org.apache.spark.sql.DataFrame = ??? // data frame to write
val file: java.nio.file.Path = ??? // target output file (i.e. 'out.csv')

import scala.collection.JavaConversions._

// write csv into temp directory which contains the additional spark output files
// could use Files.createTempDirectory instead
val tempDir = file.getParent.resolve(file.getFileName + "_tmp")
df.coalesce(1)
    .write.format("com.databricks.spark.csv")
    .option("header", "true")
    .save(tempDir.toAbsolutePath.toString)

// find the actual csv file
val tmpCsvFile = Files.walk(tempDir, 1).iterator().toSeq.find { p => 
    val fname = p.getFileName.toString
    fname.startsWith("part-00000") && fname.endsWith(".csv") && Files.isRegularFile(p)
}.get

// move to desired final path
Files.move(tmpCsvFile, file)

// delete temp directory
Files.walk(tempDir)
    .sorted(java.util.Comparator.reverseOrder())
    .iterator().toSeq
    .foreach(Files.delete(_))

1
Hadoop API 中的 FileUtil.copyMerge() 应该能解决您的问题。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}

请查看使用spark-csv编写单个CSV文件的方法。


0
这就是分布式计算的工作方式!目录中的多个文件正是分布式计算的工作原理,这并不是一个问题,因为所有软件都能处理它。
你的问题应该是“如何下载由多个文件组成的CSV文件?”-> 在SO上已经有很多解决方案。
另一种方法是使用Spark作为JDBC源(使用令人惊叹的Spark Thrift服务器),编写SQL查询并将结果转换为CSV。
为了防止驱动程序出现OOM(因为驱动程序会获取所有数据),使用增量收集(`spark.sql.thriftServer.incrementalCollect=true`),更多信息请参见http://www.russellspitzer.com/2017/05/19/Spark-Sql-Thriftserver/
关于Spark“数据分区”概念的小回顾:
输入(X个分区)-> 计算(Y个分区)-> 输出(Z个分区)
在“阶段”之间,数据可以在分区之间传输,这就是“洗牌”。你想要“Z”=1,但是带有Y>1,没有洗牌是不可能的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接