如何在Pyspark中将具有重复列名的数据框写入CSV文件

6

如何将联接操作后具有相同列名的dataframe写入CSV文件。目前我正在使用以下代码:dfFinal.coalesce(1).write.format('com.databricks.spark.csv').save('/home/user/output/',header = 'true')它会将"dataframe" dfFinal 写入 "/home/user/output"。但在dataframe中包含重复列的情况下,该方法不能正常工作。以下是 dfFinal dataframe。

+----------+---+-----------------+---+-----------------+
|  NUMBER  | ID|AMOUNT           | ID|           AMOUNT|
+----------+---+-----------------+---+-----------------+
|9090909092|  1|               30|  1|               40|
|9090909093|  2|               30|  2|               50|
|9090909090|  3|               30|  3|               60|
|9090909094|  4|               30|  4|               70|
+----------+---+-----------------+---+-----------------+

上述数据框是在连接操作之后形成的。当尝试将其写入CSV文件时,出现以下错误。
pyspark.sql.utils.AnalysisException: u'Found duplicate column(s) when inserting into file:/home/user/output: `amount`, `id`;'

我认为最好的情况是在写入之前重命名列。 - Sailesh Kotha
1个回答

4
当您将联接列指定为字符串或数组类型时,它将导致仅有一个列[1]。Pyspark示例:
l = [('9090909092',1,30),('9090909093',2,30),('9090909090',3,30),('9090909094',4,30)] 
r = [(1,40),(2,50),(3,60),(4,70)]

left = spark.createDataFrame(l, ['NUMBER','ID','AMOUNT'])
right = spark.createDataFrame(r,['ID','AMOUNT'])

df = left.join(right, "ID")
df.show()

+---+----------+------+------+
| ID| NUMBER   |AMOUNT|AMOUNT|
+---+----------+------+------+ 
| 1 |9090909092| 30   | 40   |
| 3 |9090909090| 30   | 60   |
| 2 |9090909093| 30   | 50   |
| 4 |9090909094| 30   | 70   |
+---+----------+------+------+

但是这仍然会在数据帧中产生重复的列名,对于所有不是联接列(例如此示例中的AMOUNT列)的列。对于这些类型的列,在使用toDF数据帧函数进行连接之前或之后应该分配一个新名称[2]:
newNames = ['ID','NUMBER', 'LAMOUNT', 'RAMOUNT']
df= df.toDF(*newNames)
df.show()

+---+----------+-------+-------+ 
| ID| NUMBER   |LAMOUNT|RAMOUNT|
+---+----------+-------+-------+ 
| 1 |9090909092| 30    | 40    | 
| 3 |9090909090| 30    | 60    | 
| 2 |9090909093| 30    | 50    | 
| 4 |9090909094| 30    | 70    | 
+---+----------+-------+-------+

[1] https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html如何在Spark中使用join函数连接两个拥有重复列名的dataframe

[2] http://spark.apache.org/docs/2.2.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF将RDD转换为DataFrame的方法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接