如何将Spark SQL中的数据导出为CSV格式

52

这个命令适用于HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

但是使用 Spark SQL 时,我遇到了一个错误,其中包含org.apache.spark.sql.hive.HiveQl的堆栈跟踪:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

请指导我如何在Spark SQL中编写导出CSV的功能。

这个问题/答案并不能解决Spark 2.x的问题...真正的问题是导出到标准CSV格式。请在这里回答 - Peter Krauss
7个回答

88

以下语句可用于将数据帧的内容以CSV格式写入:

df.write.csv("/data/home/csv")

如果您需要将整个数据帧写入单个CSV文件,请使用:

df.coalesce(1).write.csv("/data/home/sample.csv")

对于 Spark 1.x,您可以使用spark-csv将结果写入CSV文件。

以下 Scala 代码片段将有所帮助:

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

将内容写入单个文件中

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")

2
我尝试了你提到的coalesce方法。它会在指定路径创建一个带有“part”文件和名为“_SUCCESS”的文件的目录。你知道有什么方法可以只获取一个文件吗? - Robert Hickman
1
返回翻译文本:它不会是本地文件,而是HDFS文件。 - Alex B
我在这段代码中发现了一个错误,与此代码生成的单个csv相比,我的原始分区csv目录多了1列。我知道该代码适用于简单情况,但是我的最后两列是以 concat('"', concat_ws(",", collect_list(some_column)), '"') 格式的,虽然在插入覆盖时运行良好,但当我选择所有列并将其转换为此格式时,它就无法正常工作了,即使标题也正确,但它会错误地将倒数第二列的值填充到两个位置上并忽略其余部分。 - devssh
这是我的CSV分区在添加前的样子:"USR",0,0,""css","shell","html","python","javascript"","381534,3960,1683,229869,1569090",而现在它们看起来像这样:"\"USR\"",0,0,"\"\"css\"","\"shell\"". - devssh
我按照以下链接修复了这个问题:https://stackoverflow.com/questions/44395363/how-to-include-double-quotes-in-spark-sql-concat - devssh
显示剩余2条评论

50
自 Spark 2.X 版本以来,spark-csv 已作为本地数据源集成。因此,必要的语句简化为(Windows):
df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

或UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")

注意:正如评论所说,它是按照该名称创建具有分区的目录,而不是一个标准CSV文件。然而,这很可能是您想要的,否则您要么会崩溃驱动程序(内存不足),要么您可能正在使用非分布式环境。


1
大家好,有没有一种方法可以在尝试重写文件时替换文件,以避免出现失败的情况。 - user3341078
5
当然!.mode("overwrite").csv("/var/out.csv")的意思是覆盖模式下将数据存储为CSV格式文件到"/var/out.csv"路径中。 - Boern
2
在 Spark 2.x 中它会创建相应名称的目录。需要帮忙吗? - GadaaDhaariGeek
1
我的猜测是你的分区在那个目录里面。 - Boern
但是它不是标准的CSV文件,它会生成一个带有奇怪文件的文件夹(!)。请参见https://stackoverflow.com/q/58142220/287948 - Peter Krauss
如果您正在使用Spark,因为您正在处理“大”数据集,那么您可能不想执行任何类似于coalesce(1)toPandas()的操作,因为这很可能会导致驱动程序崩溃(因为整个数据集必须适合驱动程序的RAM)。另一方面:如果您的数据适合单台计算机的RAM,那么为什么要折磨自己进行分布式计算呢? - Boern

32

使用spark-csv的上面答案是正确的,但存在一个问题 - 该库会根据数据帧分区创建多个文件。而这不是我们通常需要的。因此,您可以将所有分区合并为一个:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

并将该库的输出(名称为“part-00000”)重命名为所需的文件名。

此博客文章提供了更多细节:https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/


2
如果希望继续向现有文件写入数据,可以添加模型。resultDF.repartition(1).write.mode("append").format("com.databricks.spark.csv").option("header", "true").save("s3://...") - Pramit
5
使用coalesce(1)需要将数据集装入单台计算机的堆中,当处理大型数据集时很可能会出现问题。 - Boern
@DmitryPetrov,当包含coalesce选项时,我们需要提到write.format(“com ...”)选项吗? - JKC
@JKC 是的,coalesce(1) 只会将数据重分区到单个分区(文件)。 - Dmitry Petrov
@DmitryPetrov 我知道 coalesce(1) 只是将其重新分区为单个分区文件,但在使用 coalesce 选项时,我们是否需要在 Spark 2.x 中明确指定 write.format 选项? - JKC

11

最简单的方法是通过对DataFrame的RDD进行映射,然后使用mkString函数:

  df.rdd.map(x=>x.mkString(","))

从Spark 1.5版本开始(甚至更早) df.map(r=>r.mkString(","))将会执行相同的操作。 如果您需要CSV转义,可以使用Apache Commons Lang。例如,这是我们正在使用的代码。

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }

2
虽然这是最简单的答案(也是一个好答案),但如果你的文本中有双引号,你就必须考虑它们。 - devonlazarus
创建表的RDD后仅出现错误。scala> df.rdd.map(x=>x.mkString(",")); <console>:18: 错误: 值rdd不是org.apache.spark.sql.SchemaRDD的成员 df.rdd.map(x=>x.mkString(",")); - shashankS

2
借助spark-csv,我们可以将数据写入CSV文件。
val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`

不,这不是一个真正的CSV文件,结果output.csv是一个文件夹。 - Peter Krauss

1
错误消息表明查询语言不支持此功能。但是,您可以通过RDD接口像往常一样以任何格式保存DataFrame(df.rdd.saveAsTextFile)。或者您可以查看https://github.com/databricks/spark-csv

scala> df.write.format("com.databricks.spark.csv").save("/data/home.csv") <console>:18: 错误: 值 write 不是 org.apache.spark.sql.SchemaRDD 的成员 我需要再次使用databricks包构建当前的jar吗? - shashankS
DataFrame.write 在 Apache Spark 1.4.0 中被添加。 - Daniel Darabos

-3

在数据框中:

val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接