如何将 pyspark 数据框保存为单个 CSV 文件

4
这是关于如何将DataFrame保存为CSV文件的延续。
我正在尝试在pyspark 3.0.1中保存我的pyspark数据框df。所以我写了:
df.coalesce(1).write.csv('mypath/df.csv)

执行完毕后,我看到了一个名为df.csv的文件夹,其中包含以下4个文件。

1._committed_..
2._started_...
3._Success  
4. part-00000-.. .csv

你能建议我如何将所有数据保存在 df.csv 文件中吗?

3个回答

4
你可以使用.coalesce(1)将文件保存在只有1个csv分区中,然后将此csv重命名并移动到所需的文件夹中。
以下是执行此操作的函数: df:你的df
fileName:你想要为csv文件命名的名称
filePath:你想要保存到的文件夹
def export_csv(df, fileName, filePath):
  
  filePathDestTemp = filePath + ".dir/" 

  df\
    .coalesce(1)\
    .write\
    .csv(filePathDestTemp) # use .csv to save as csv

  listFiles = dbutils.fs.ls(filePathDestTemp)
  for subFiles in listFiles:
    if subFiles.name[-4:] == ".csv":
      
      dbutils.fs.cp (filePathDestTemp + subFiles.name,  filePath + fileName+ '.csv')

  dbutils.fs.rm(filePathDestTemp, recurse=True)

嗨,我正在尝试这个方法,因为它看起来是其中一个更简单的解决方案,但是我无法成功导入库:我运行了 !pip3 install dbutils 但是当我运行你的第一行代码时:listFiles = dbutils.fs.ls(path_nm+'.dir/') 我收到了错误信息 AttributeError: module 'dbutils' has no attribute 'fs'。非常感谢您的帮助。 - GenDemo
dbutils是Databricks的一个功能。如果您没有在Databricks上运行它,您需要重写它以适应您的堆栈。 - Luiz Viola
谢谢 - 我找了个同事帮我解决这个问题,因为我们正在使用Cloudera。 - GenDemo

1
如果您想要得到一个名为df.csv的文件作为输出,您可以先将其写入一个临时文件夹,然后移动由Spark生成的部分文件并重命名它。
这些步骤可以使用Hadoop FileSystem API通过JVM网关完成。
temp_path = "mypath/__temp"
target_path = "mypath/df.csv"

df.coalesce(1).write.mode("overwrite").csv(temp_path)

Path = sc._gateway.jvm.org.apache.hadoop.fs.Path

# get the part file generated by spark write
fs = Path(temp_path).getFileSystem(sc._jsc.hadoopConfiguration())
csv_part_file = fs.globStatus(Path(temp_path + "/part*"))[0].getPath()

# move and rename the file
fs.rename(csv_part_file, Path(target_path))
fs.delete(Path(temp_path), True)

它就像魔法一样有效。谢谢分享。我有一个评论 - 你知道如何保留标题吗?coalesce没有'option',所以寻求你的帮助? - Kalenji

0
在`pandas.DataFrame.to_csv`中,当路径为`None`时,该函数将csv作为字符串返回。您可以直接将此字符串写入dbfs上的任何文件中,使用`dbutils.fs.put`方法。
csv_buffer = df.toPandas().to_csv(sep=';', header=True, index=False)
dbutils.fs.put('mypath/df.csv', csv_buffer, overwrite=True)

我猜这只有在所有数据都能适应驱动节点的内存时才有效。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接