如何在Pyspark中使用多列进行连接?

95
我正在使用Spark 1.3,并希望使用Python接口(SparkSQL)在多个列上进行连接。
以下代码是有效的:
我首先将它们注册为临时表。
numeric.registerTempTable("numeric")
Ref.registerTempTable("Ref")

test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')

我现在想根据多列进行连接。

使用以下代码会出现SyntaxError: invalid syntax错误:

test  = numeric.join(Ref,
   numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
   numeric.STATUS == Ref.STATUS ,  joinType='inner')
4个回答

150
你应该使用 & / | 运算符,并注意 运算符优先级== 优先级低于位运算的 ANDOR):
df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
df.show()

## +---+---+---+---+---+---+
## | x1| x2| x3| x1| x2| x3|
## +---+---+---+---+---+---+
## |  2|  b|3.0|  2|  b|0.0|
## +---+---+---+---+---+---+

2
当你说“注意运算符优先级”时,你是指什么?你是指我应该在正确的位置放置括号来AND正确的表格吗? - Chogg
5
@Chogg,他的意思是如果你在括号方面不小心,短语df1.x1 == df2.x1 & df1.x2 == df2.x2(括号已删除)将被Python解释器评估为df1.x1 == (df2.x1 & df1.x2) == df2.x2,这可能会引发混乱和不具描述性的错误。 - Brendan
2
为什么它会生成两次列x1x2 - Bikash Gyawali

95

另一种方法是:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x4"))

df = df1.join(df2, ['x1','x2'])
df.show()

输出结果为:

+---+---+---+---+
| x1| x2| x3| x4|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

主要优点在于表格连接所使用的列不会在输出中重复,从而降低了出现错误的风险,例如org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.


当两个表格中的列具有不同的名称时(比如上面的例子中df2y1y2y4这些列),可采用以下语法:

df = df1.join(df2.withColumnRenamed('y1','x1').withColumnRenamed('y2','x2'), ['x1','x2'])

如果我执行外连接并且只想保留键的单个出现怎么办? - Devarshi Mandal
5
这可能是我最不喜欢的 PySpark 错误:Reference 'x1' is ambiguous, could be: x1#50L, x1#57L. 我不明白为什么它允许你像 df = df1.join(df2, df1.x1 == df2.x1) 这样做,但只要你尝试对生成的 df 做几乎任何事情,就会出错。这只是一个小小的牢骚,但你有没有任何理由想要带有重复名称的结果 df - seth127

13
test = numeric.join(Ref, 
   on=[
     numeric.ID == Ref.ID, 
     numeric.TYPE == Ref.TYPE,
     numeric.STATUS == Ref.STATUS 
   ], how='inner')

1
欢迎来到StackOverflow。你能否详细解释一下你的代码?为什么它被构建成这样的结构?它是如何工作的?等等。 - Dominik
2
答案很棒。但是为了最佳实践,请提供一些解释。仅仅发布代码让原帖作者和未来的读者复制粘贴你的答案而不理解答案背后的逻辑。请提供一些解释性的答案。谢谢! - Thavas Antonio

1

如果列名相同,您还可以提供字符串列表。

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, ["x1","x2"])

df.show()
+---+---+---+---+
| x1| x2| x3| x3|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

如果列名不同,而且你想依赖于列名字符串,另一种处理方法如下:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("y1", "y2", "y3"))

df = df1.join(df2, (col("x1")==col("y1")) & (col("x2")==col("y2")))

df.show()
+---+---+---+---+---+---+
| x1| x2| x3| y1| y2| y3|
+---+---+---+---+---+---+
|  2|  b|3.0|  2|  b|0.0|
+---+---+---+---+---+---+

如果您想动态引用列名,或者在列名中有空格而无法使用df.col_name语法的情况下,这将非常有用。不过,在这种情况下,建议您考虑更改列名。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接