PySpark SQL中的用户定义聚合函数

5
如何在PySpark SQL中实现用户定义的聚合函数(UDAF)?
pyspark version = 3.0.2
python version = 3.7.10

作为一个最简单的例子,我想用UDAF替换AVG聚合函数:
sc = SparkContext()
sql = SQLContext(sc)
df = sql.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
rv = sql.sql('SELECT id, AVG(value) FROM df GROUP BY id').toPandas()

其中rv将是:

In [2]: rv
Out[2]:
   id  avg(value)
0   1         1.5
1   2         3.5

如何使用UDAF替换查询中的AVG

例如,以下语句无法正常工作:

import numpy as np
def udf_avg(x):
    return np.mean(x)
sql.udf.register('udf_avg', udf_avg)
rv = sql.sql('SELECT id, udf_avg(value) FROM df GROUP BY id').toPandas()

这个想法是在纯Python中实现一个UDAF,以处理SQL聚合函数不支持的内容(例如低通滤波器)。


这个回答解决了你的问题吗?在PySpark中对GroupedData应用UDFs(附带Python示例) - blackbishop
不行,因为“pandas_udf”自Spark 3.0以来定义已经发生了变化。 - Russell Burdt
2个回答

4

可以使用Pandas UDF,定义兼容于Spark 3.0Python 3.6+。详细信息请参见问题文档

在Spark SQL中的完整实现:

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')

@pandas_udf(DoubleType())
def avg_udf(s: pd.Series) -> float:
    return s.mean()
spark.udf.register('avg_udf', avg_udf)

rv = spark.sql('SELECT id, avg_udf(value) FROM df GROUP BY id').toPandas()

带返回值

In [2]: rv
Out[2]:
   id  avg_udf(value)
0   1             1.5
1   2             3.5

1
我认为你的意思是 FloatType,因为签名使用了 float,但除此之外,这是对我的答案的不错改进 :) - mck
1
如果你想避免使用过时的功能,我建议你使用SparkSession而不是已经过时很久的SQLContext - mck
SparkSession 是更好的选择,谢谢你指出来 :) 关于 FloatType vs DoubleType,两者都可以使用,但我认为后者是正确的实现,因为它是双精度,就像 float 一样。似乎不符合 Python 的风格,我们必须两次以不同的格式指定返回值类型。有人知道这背后的原因吗? - Russell Burdt
1
我不知道,但你可以使用字符串'double',这样可以避免导入和一些打字... - mck

2
你可以使用 Pandas UDF 并选择 GROUPED_AGG 类型。它会将 Spark 中的列作为 Pandas Series 接收,这样你就可以在该列上调用 Series.mean
import pyspark.sql.functions as F

@F.pandas_udf('float', F.PandasUDFType.GROUPED_AGG)  
def avg_udf(s):
    return s.mean()

df2 = df.groupBy('id').agg(avg_udf('value'))

df2.show()
+---+--------------+
| id|avg_udf(value)|
+---+--------------+
|  1|           1.5|
|  2|           3.5|
+---+--------------+

将其注册以在 SQL 中使用也是可能的:

df.createTempView('df')
spark.udf.register('avg_udf', avg_udf)

df2 = spark.sql("select id, avg_udf(value) from df group by id")
df2.show()
+---+--------------+
| id|avg_udf(value)|
+---+--------------+
|  1|           1.5|
|  2|           3.5|
+---+--------------+

您提供的解决方案适用于Spark 3.0之前的版本,请参见此链接。从Spark 3.0和Python 3.6+开始,Pandas UDF定义已更改。这是触发的特定UserWarning:在Python 3.6+和Spark 3.0+中,最好为pandas UDF指定类型提示,而不是指定将在未来版本中弃用的pandas UDF类型。有关更多详细信息,请参见SPARK-28264 - Russell Burdt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接