如何将 PySpark 中的表数据框导出为 CSV 文件?

118
我正在使用 Spark 1.3.1 (PySpark),并使用 SQL 查询生成了一张表。现在我有一个对象是DataFrame,我想将这个DataFrame 对象(我称之为“table”)导出到 csv 文件,以便我可以操作它并绘制列。如何将DataFrame “table”导出到 csv 文件呢?谢谢!
9个回答

261
如果数据帧适合于驱动程序内存并且您想要保存到本地文件系统,则可以使用 toPandas 方法将 Spark DataFrame 转换为本地的 Pandas DataFrame,然后简单地使用 to_csv
df.toPandas().to_csv('mycsv.csv')
否则,您可以使用spark-csv

  • Spark 1.3

df.save('mycsv.csv', 'com.databricks.spark.csv')
  • Spark 1.4及以上版本

    df.write.format('com.databricks.spark.csv').save('mycsv.csv')
    
  • 在 Spark 2.0+ 版本中,您可以直接使用 csv 数据源:

    df.write.csv('mycsv.csv')
    

    10
    如果你有 Spark 数据框,你可以使用 df.write.csv('/tmp/lookatme/') 将一组 CSV 文件存储在 /tmp/lookatme 目录中。与在 Pandas 中序列化相比,使用 Spark 处理数据速度显著更快。唯一的缺点是最终会得到一组 CSV 文件而不是单个文件,如果目标工具不知道如何将它们连接起来,则需要自己操作。 - Txangel
    1
    从Spark中获取CSV文件真是一件大事。有趣的是,第一个解决方案中的to_csv函数可以直接使用,而不需要导入Pandas库。.toPandas函数似乎是Spark的一部分,可能会隐式地导入它。 - cardamom
    45
    如果您坚持只输出一个文件,可以使用df.coalesce(1).write.csv('mycsv.csv') - MichaelChirico
    4
    使用 df.write.csv('mycsv.csv') 命令可以将 CSV 文件导出到 HDFS 环境中。如何将其获取到本地环境? - Tracy
    1
    @etjk,你可以使用以下命令将HDFS目录复制到本地目录: hdfs dfs -copyToLocal <input> <output> - Chingiz K.
    显示剩余6条评论

    47

    对于 Apache Spark 2+,要将 dataframe 保存为单个 csv 文件,请使用以下命令

    query.repartition(1).write.csv("cc_out.csv", sep='|')
    

    这里的1表示我只需要一个CSV分区,您可以根据您的要求进行更改。


    8
    根据这里所述:https://spark.apache.org/docs/2.2.0/api/python/pyspark.html#pyspark.RDD.repartition 建议使用coalesce()而不是repartition()来提高性能("如果您要减少此RDD中的分区数,请考虑使用coalesce,它可以避免执行shuffle操作。") - Seastar
    3
    @Seastar:虽然在几种情况下合并可能有优势,但您的评论在这种特殊情况下不适用。如果您想在HDFS(或其他任何地方)中拥有一个.csv文件,则通常希望有一个文件而不是分散在整个集群中的数十个文件(进行“repartition(1)”的整个意义)。无论如何,您都需要对数据进行洗牌,因此在更大的图景中,合并将毫无帮助。 - Markus

    21

    如果您无法使用spark-csv,则可以执行以下操作:

    df.rdd.map(lambda x: ",".join(map(str, x))).coalesce(1).saveAsTextFile("file.csv")
    

    如果您需要处理带有换行符或逗号的字符串,那么这种方法将无法正常工作。请使用以下方法:

    import csv
    import cStringIO
    
    def row2csv(row):
        buffer = cStringIO.StringIO()
        writer = csv.writer(buffer)
        writer.writerow([str(s).encode("utf-8") for s in row])
        buffer.seek(0)
        return buffer.read().strip()
    
    df.rdd.map(row2csv).coalesce(1).saveAsTextFile("file.csv")
    

    12
    您需要将数据框重新分区为单个分区,然后定义Unix文件系统格式下的文件格式、路径和其他参数,就可以开始了。
    df.repartition(1).write.format('com.databricks.spark.csv').save("/path/to/file/myfile.csv",header = 'true')
    

    阅读有关重新分区函数的更多信息。 阅读有关保存函数的更多信息。

    但是,repartition是一个昂贵的函数,而toPandas()则更糟糕。尝试在之前的语法中使用.coalesce(1)代替.repartition(1),以获得更好的性能。

    了解更多关于repartition vs coalesce函数的信息。


    9

    使用PySpark

    在Spark 3.0+中编写CSV文件的最简单方法

    sdf.write.csv("/path/to/csv/data.csv")
    

    这可以根据您使用的Spark节点数生成多个文件。如果您想将其放在单个文件中,请使用repartition。

    sdf.repartition(1).write.csv("/path/to/csv/data.csv")
    

    使用Pandas

    如果您的数据不太多并且可以保存在本地Python中,则可以利用Pandas处理。

    sdf.toPandas().to_csv("/path/to/csv/data.csv", index=False)
    

    使用Koalas

    sdf.to_koalas().to_csv("/path/to/csv/data.csv", index=False)
    

    哇塞,我已经寻找类似.repartition(1)这样的东西好几个小时了,只为了将其写入一个CSV文件中。非常感谢你,太太太感谢了!!!!! - sweetmusicality
    如果有帮助,请点赞) - s510
    已经完成了 :-) - sweetmusicality
    这是正确答案。其他的至少五年以上了。 - 123

    4
    这个怎么样(如果你不想使用一行代码)?这与IT技术有关。
    for row in df.collect():
        d = row.asDict()
        s = "%d\t%s\t%s\n" % (d["int_column"], d["string_column"], d["string_column"])
        f.write(s)
    

    f是已经打开的文件描述符。另外分隔符是制表符,但很容易更改为任何您想要的字符。


    2
    '''
    I am late to the pary but: this will let me rename the file, move it to a desired directory and delete the unwanted additional directory spark made
    '''
    
    import shutil
    import os
    import glob
    
    path = 'test_write'
    #write single csv
    students.repartition(1).write.csv(path)
    
    #rename and relocate the csv
    shutil.move(glob.glob(os.getcwd() + '\\' + path + '\\' + r'*.csv')[0], os.getcwd()+ '\\' + path+ '.csv')
    
    #remove additional directory
    shutil.rmtree(os.getcwd()+'\\'+path)
    

    1

    我曾尝试使用pandas方法,但性能非常差。最终花费的时间太长了,以至于我不得不寻找另一种方法。

    如果你正在寻找一种将数据写入一个CSV文件而不是多个CSV文件的方法,那么这就是你要找的方法:

    df.coalesce(1).write.csv("train_dataset_processed", header=True)
    

    它将处理数据集的时间从2个多小时缩短到了2分钟


    0

    尝试使用display(df)并在结果中使用下载选项。请注意:此选项仅可下载100万行,但速度非常快。


    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接