PySpark中如何在UDF中使用DataFrame?

5
我有两个数据框df1
+---+---+----------+
|  n|val| distances|
+---+---+----------+
|  1|  1|0.27308652|
|  2|  1|0.24969208|
|  3|  1|0.21314497|
+---+---+----------+

and df2

+---+---+----------+
| x1| x2|         w|
+---+---+----------+
|  1|  2|0.03103427|
|  1|  4|0.19012526|
|  1| 10|0.26805446|
|  1|  8|0.26825935|
+---+---+----------+

我希望给 df1 添加一个名为 gamma 的新列,该列将包含在 df2 中当 df1.n == df2.x1 OR df1.n == df2.x2 时,w 值的总和。
我尝试使用udf,但显然从不同的数据框中选择将无法工作,因为值应在计算之前确定。
gamma_udf = udf(lambda n: float(df2.filter("x1 = %d OR x2 = %d"%(n,n)).groupBy().sum('w').rdd.map(lambda x: x).collect()[0]), FloatType())
df1.withColumn('gamma1', gamma_udf('n'))

有没有办法使用joingroupby而不使用循环来完成它?

2
df1.join(df2, (df1.n == df2.x1) | (df1.n == df2.x2)).groupBy(df1.n).sum("w")? 请问需要将此段代码翻译成中文吗? - Alper t. Turker
1个回答

4

您不能在udf内部引用DataFrame。正如您所提到的,最好使用join来解决此问题。

如果我理解正确,您正在寻找类似以下内容的东西:

from pyspark.sql import Window
import pyspark.sql.functions as F

df1.alias("L").join(df2.alias("R"), (df1.n == df2.x1) | (df1.n == df2.x2), how="left")\
    .select("L.*", F.sum("w").over(Window.partitionBy("n")).alias("gamma"))\
    .distinct()\
    .show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

如果您更熟悉 pyspark-sql 语法,您可以注册临时表并执行以下操作:

df1.registerTempTable("df1")
df2.registerTempTable("df2")

sqlCtx.sql(
    "SELECT DISTINCT L.*, SUM(R.w) OVER (PARTITION BY L.n) AS gamma "
    "FROM df1 L LEFT JOIN df2 R ON L.n = R.x1 OR L.n = R.x2"
).show()
#+---+---+----------+----------+
#|  n|val| distances|     gamma|
#+---+---+----------+----------+
#|  1|  1|0.27308652|0.75747334|
#|  3|  1|0.21314497|      null|
#|  2|  1|0.24969208|0.03103427|
#+---+---+----------+----------+

解释

在这两种情况下,我们都是对df1df2进行了left join。这将保留df1中的所有行,无论是否匹配。

连接子句是您在问题中指定的条件。因此,x1x2等于ndf2中的所有行都将被连接。

接下来,选择左表中的所有行,再按(分区)n进行分组,并求出w的值的总和。这将获得每个n值匹配连接条件的所有行的总和。

最后,我们只返回不同的行以消除重复。


如果 df2 的维度比 df1 大得多,那么总和会在 df2 的所有值上进行计算吗? - Maria
总和将覆盖df2中与分区条件匹配的所有值。这对您不起作用吗?如果是这样,您能提供一个例子吗? - pault
我不太确定,对于pyspark还有点陌生。我只是在尝试弄清楚你的答案是如何工作的。 - Maria

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接