Spark为DataFrame连接指定多个列条件

52

如何在连接两个数据框时给出更多的列条件。例如,我想执行以下操作:

val Lead_all = Leads.join(Utm_Master,  
    Leaddetails.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign") ==
    Utm_Master.columns("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
"left")

只有这些列匹配时,我才想加入。但是上面的语法无效,因为cols只使用一个字符串。那么,我该如何得到我想要的结果。

9个回答

102

针对这种情况,可以使用Spark 列/表达式API连接

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

在这个例子中,<=> 运算符的意思是 "安全处理空值的等式测试。"与简单的 等式测试===)相比,第一个运算符可以安全地用于某一列可能具有空值的情况。

5
你能解释一下 ===<=> 之间的区别吗? Answer: === 是严格相等运算符,用于比较两个值是否完全相同,包括数据类型和值。而 <=> 是太空船操作符,用于比较两个值的大小关系,并返回 -1、0 或 1,代表第一个值小于、等于或大于第二个值。 - zero323
5
关于这些等式测试的区别更新了更多信息。 - rchukh
1
啊哈,文档里找不到这个。你是怎么知道的? - user568109
@user568109 我正在使用Java API,有些情况下Column/Expression API是唯一的选择。此外,Column/Expression API大多实现为Builder,因此更容易在每个Spark版本中发现新方法。 - rchukh
1
这给了我重复的列,所以我使用了我在另一个答案中添加的Seq方法。 - Climbs_lika_Spyder
显示剩余2条评论

22

截至Spark版本1.5.0(目前未发布),您可以在多个DataFrame列上进行连接。请参阅SPARK-7990:添加方法以便于在多个连接键上进行等值连接

Python

Leads.join(
    Utm_Master, 
    ["LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"],
    "left_outer"
)

Scala

这个问题要求一个 Scala 的答案,但我不使用 Scala。这是我的最佳猜测...

Leads.join(
    Utm_Master,
    Seq("LeadSource","Utm_Source","Utm_Medium","Utm_Campaign"),
    "left_outer"
)

我们如何使连接忽略大小写(即使它不区分大小写)?我尝试了下面的方法,但没有起作用。sqlContext.sql("set spark.sql.caseSensitive=false") - soMuchToLearnAndShare

8
< p > === 选项会导致我有重复的列。因此,我使用Seq代替。

val Lead_all = Leads.join(Utm_Master,
    Seq("Utm_Source","Utm_Medium","Utm_Campaign"),"left")

当然,这仅适用于加入列的名称相同的情况。

8

您可以采用原始SQL的方法:

case class Bar(x1: Int, y1: Int, z1: Int, v1: String)
case class Foo(x2: Int, y2: Int, z2: Int, v2: String)

val bar = sqlContext.createDataFrame(sc.parallelize(
    Bar(1, 1, 2, "bar") :: Bar(2, 3, 2, "bar") ::
    Bar(3, 1, 2, "bar") :: Nil))

val foo = sqlContext.createDataFrame(sc.parallelize(
    Foo(1, 1, 2, "foo") :: Foo(2, 1, 2, "foo") ::
    Foo(3, 1, 2, "foo") :: Foo(4, 4, 4, "foo") :: Nil))

foo.registerTempTable("foo")
bar.registerTempTable("bar")

sqlContext.sql(
    "SELECT * FROM foo LEFT JOIN bar ON x1 = x2 AND y1 = y2 AND z1 = z2")

这是我目前使用的方法。我希望能够在不注册临时表的情况下完成它。如果使用数据框架API无法实现,我将接受答案。 - user568109
如果是这样,@rchukh的答案要好得多。 - zero323

7

Scala:

Leaddetails.join(
    Utm_Master, 
    Leaddetails("LeadSource") <=> Utm_Master("LeadSource")
        && Leaddetails("Utm_Source") <=> Utm_Master("Utm_Source")
        && Leaddetails("Utm_Medium") <=> Utm_Master("Utm_Medium")
        && Leaddetails("Utm_Campaign") <=> Utm_Master("Utm_Campaign"),
    "left"
)

为了使其不区分大小写:
import org.apache.spark.sql.functions.{lower, upper}

接下来只需在join方法的条件中使用lower(value)即可。

例如:dataFrame.filter(lower(dataFrame.col("vendor")).equalTo("fortinet"))


6
Pyspark中,您可以简单地分别指定每个条件:
val Lead_all = Leads.join(Utm_Master,  
    (Leaddetails.LeadSource == Utm_Master.LeadSource) &
    (Leaddetails.Utm_Source == Utm_Master.Utm_Source) &
    (Leaddetails.Utm_Medium == Utm_Master.Utm_Medium) &
    (Leaddetails.Utm_Campaign == Utm_Master.Utm_Campaign))

一定要正确使用运算符和括号。


2
Pyspark中,使用括号将每个条件括起来是在连接条件中使用多个列名的关键。
joined_df = df1.join(df2, 
    (df1['name'] == df2['name']) &
    (df1['phone'] == df2['phone'])
)

0

Spark SQL支持在括号中以元组形式进行列的连接,例如

... WHERE (list_of_columns1) = (list_of_columns2)

这种方法比为每个由一组“AND”组合的列对指定相等表达式(=)要短得多。

例如:

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE (t1.a, t1.b, t1.c) = (e.a, e.b, e.c)
   )

而不是

SELECT a,b,c
FROM    tab1 t1
WHERE 
   NOT EXISTS
   (    SELECT 1
        FROM    t1_except_t2_df e
        WHERE t1.a=e.a AND t1.b=e.b AND t1.c=e.c
   )

特别是当列的列表很长且您想轻松处理NULL时,这种方法可读性较差。


这真的有效吗?这在1.6版本中支持吗? - Shankar

0

试试这个:

val rccJoin=dfRccDeuda.as("dfdeuda")
.join(dfRccCliente.as("dfcliente")
,col("dfdeuda.etarcid")===col("dfcliente.etarcid") 
&& col("dfdeuda.etarcid")===col("dfcliente.etarcid"),"inner")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接