如何将DataFrame保存为压缩的(gzipped)CSV文件?

31

我使用Spark 1.6.0和Scala。

我想把一个DataFrame保存为压缩的CSV格式。

这是我目前的代码(假设我已经有了dfsc作为SparkContext):

//set the conf to the codec I want
sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

df.write
  .format("com.databricks.spark.csv")
  .save(my_directory)

输出结果不是 gz 格式。


RDD相关问题:https://dev59.com/Uo7ea4cB1Zd3GeqPB31w - Nick Chammas
4个回答

37

2
虽然这段代码可能回答了问题,但是提供关于为什么和/或如何回答问题的额外上下文可以提高其长期价值。 - manniL
如果使用“JSON”格式,则不会选择压缩。 - Duckling
3
关键字参数似乎已被改为 compression。https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec#pyspark.sql.DataFrameWriter.csv - volker238

34

使用 Spark 2.0+,这变得更加简单了:

df.write.csv("path", compression="gzip")  # Python-only
df.write.option("compression", "gzip").csv("path") // Scala or Python

您不再需要使用外部的Databricks CSV包。

csv() writer支持许多方便的选项。例如:

  • sep:设置分隔符字符。
  • quote:是否以及如何引用值。
  • header:是否包含头行。

gzip之外,还可以使用其他许多压缩编解码器:

  • bzip2
  • lz4
  • snappy
  • deflate

csv() writer的完整Spark文档在此处:Python / Scala


3
感谢您提供 CSV Writer 文档的链接,并没有给出仅适用于 Databricks 的答案! - Laurens Koppenol
@LaurensKoppenol - 嗯,公平地说,添加到Spark的CSV支持最初是作为外部Databricks CSV包链接在被接受的答案中开始的。 :) 该软件包可供任何Spark用户使用,但从Spark 2.0开始,您不再需要它。 - Nick Chammas
10
我必须使用df.write.option("compression","gzip").csv("path")来在Spark 2.2中进行操作。 - Mark Rajcok

24

Spark 2.2+

df.write.option("compression","gzip").csv("path")

Spark 2.0

df.write.csv("path", compression="gzip")

Spark 1.6

在spark-csv的github上: https://github.com/databricks/spark-csv

可以看到:

codec:保存到文件时使用的压缩编解码器。应该是实现org.apache.hadoop.io.compress.CompressionCodec接口的类的完全限定名称,或者不区分大小写的简称(bzip2、gzip、lz4和snappy)。当未指定编解码器时,默认为不压缩。

在这种情况下,可以这样使用:

df.write.format("com.databricks.spark.csv").codec("gzip")\ .save('my_directory/my_file.gzip')


1

以带有标题的格式将CSV文件写入,并将part-000文件重命名为.csv.gzip

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite")
.option("header","true")
.option("codec","org.apache.hadoop.io.compress.GzipCodec").save(tempLocationFileName)

copyRename(tempLocationFileName, finalLocationFileName)

def copyRename(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
  // the "true" setting deletes the source files once they are merged into the new output
}

如果您不需要标题,那么将其设置为false,您也不需要执行合并操作。这样写起来也更快。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接