Spark Dataframe 区分具有重复名称的列

149

据我所知,在Spark Dataframe中,多个列可以具有相同的名称,如下面的数据框快照所示:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

上述结果是通过将一个数据框与其自身连接创建的,您可以看到其中有两个 a 和两个 f 的 4 列。

问题在于当我尝试使用 a 列进行更多计算时,我无法找到选择 a 的方法,我尝试了 df[0]df.select('a'),但都返回了以下错误信息:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

在Spark API中,是否有办法重新区分重复命名的列?或者有没有一些方法可以让我更改列名?

12个回答

152

让我们从一些数据开始:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

解决这个问题的方法有几种。首先,您可以使用父列来明确引用子表列:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

您也可以使用表别名:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

最后,您可以通过编程方式重命名列:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+

9
感谢您进行编辑并展示了在那些模棱两可的情况下获取正确列的许多方法,我认为您的示例应该被纳入Spark编程指南中。我学到了很多! - resec
小修正:df2_r = **df2**.select(*(col(x).alias(x + '_df2') for x in df2.columns)),而不是 df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns))。其他都很好。 - Vzzarr
我同意这应该成为Spark编程指南的一部分。纯金。我终于能够解开在连接之前按旧名称选择列的歧义源头了。在连接之前以编程方式添加后缀到列名的解决方案消除了所有的歧义。 - Pablo Adames
@resec:你明白为什么需要重命名 df1_a = df1.alias("df1_a"),而不能直接使用 df1df2 吗?这个回答没有解释为什么需要重命名以使得 select('df1_a.f') 正常工作。 - Sheldore
@Sheldore 这是针对原始问题的应用,其中一个表 df 与自身连接。也许如果写成 df.alias("df1_a")df.alias("df2_a"),解决方案会更有意义。 - timctran

81

8
这是截至 Spark 2+ 的实际答案。 - Matt
5
针对 Scala 代码:df1.join(df2, Seq("a")),我的翻译如下:使用 "a" 列连接 df1 和 df2 数据框。 - mauriciojost
2
页面已移动到:https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html - bogdan.rusu
5
很高兴我继续滑动屏幕,这是更好的答案。如果列名不同,那就没有歧义问题。如果列名相同,则执行此操作。使用此方法很少需要处理有歧义的列名。 - Paul Fornia
我正在做同样的事情,但我是基于两列进行连接,这是否适用于多个列?如果是的话,那么我不知道为什么它对我不起作用。df1.join(df2,['a','b']) - Pawan Prasad

79

我建议你更改你的join的列名。

df1.select(col("a") as "df1_a", col("f") as "df1_f")
   .join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))
生成的 DataFrame 将具有 schema
(df1_a, df1_f, df2_a, df2_f)

6
由于引号在列名之间未正确调整,您可能需要修正您的答案。 - Sameh Sharaf
3
我假设你是给我的答案点了踩的人?但实际上,我的答案是100%正确的 - 我只是使用Scala中'符号来选择列,因此引号没有任何问题。 - Glennie Helles Sindholt
39
@GlennieHellesSindholt, 说得好。这让人感到困惑,因为答案标记为“python”和“pyspark”。 - Jorge Leitao
2
如果每个数据框包含100多列,我们只需要重命名一个相同的列名怎么办?肯定不能在选择子句中手动键入所有这些列名。 - Bikash Gyawali
13
在这种情况下,您可以使用df1.withColumnRenamed("a", "df1_a")进行更名。 - Glennie Helles Sindholt
显示剩余3条评论

16

这是我们如何在PySpark中使用相同列名连接两个数据框架。

df = df1.join(df2, ['col1','col2','col3'])

如果您在此之后执行printSchema(),则可以看到重复的列已被删除。


12

您可以使用def drop(col: Column)方法来删除重复的列,例如:

DataFrame:df1

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

DataFrame:df2

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

当我将df1与df2连接时,DataFrame将如下所示:

val newDf = df1.join(df2,df1("a")===df2("a"))

DataFrame:newDf

+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

现在,我们可以使用def drop(col: Column)方法来删除重复的列'a'或'f',就像下面这样:

```scala df.drop("a", "f") ```
val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))

如果您正在执行外连接并且两列具有一些不同的值,那么这种方法是否有效? - prafi
您可能不想删除具有相同模式的不同关系。 - thebluephantom

9
假设您要连接的DataFrames是df1和df2,您要在列'a'上进行连接,那么有两种方法:
方法1
df1.join(df2, 'a', 'left_outer')
这是一种非常棒的方法,强烈推荐使用。
方法2
df1.join(df2, df1.a == df2.a, 'left_outer').drop(df2.a)

5
在深入研究Spark API后,我发现可以先使用alias为原始数据框创建一个别名,然后对别名上的每一列使用withColumnRenamed手动重命名,这样做会在不造成列名重复的情况下进行join
更多详细信息可参考以下Spark Dataframe APIpyspark.sql.DataFrame.alias pyspark.sql.DataFrame.withColumnRenamed 然而,我认为这只是一个麻烦的解决方法,想知道是否有更好的解决方案。

5

如果只有两个表中的键列相同,请尝试使用以下方法(方法1):

left. join(right , 'key', 'inner')

比以下方式更好(方法2):
left. join(right , left.key == right.key, 'inner')

使用方法1的优点:

  • 在最终数据框中,“关键字”只会显示一次
  • 语法易于使用

使用方法1的缺点:

  • 仅适用于关键列
  • 在左连接情况下,如果计划使用右侧关键字空值计数,则此方法将无法正常工作。在这种情况下,必须按照上述方法之一重新命名一个关键字。

3
这可能不是最佳方式,但如果想要在连接之后重命名重复的列,可以使用这个简单的函数。
def rename_duplicate_columns(dataframe):
    columns = dataframe.columns
    duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
    for index in duplicate_column_indices:
        columns[index] = columns[index]+'2'
    dataframe = dataframe.toDF(*columns)
    return dataframe

3

如果您的用例比 Glennie Helles Sindholt 的答案描述更为复杂,例如您有其他/少数非连接列名称也相同,并且希望在选择时加以区分,最好使用别名,例如:

df3 = df1.select("a", "b").alias("left")\
   .join(df2.select("a", "b").alias("right"), ["a"])\
   .select("left.a", "left.b", "right.b")

df3.columns
['a', 'b', 'b']

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接