如何在PySpark SQL中实现用户定义的聚合函数(UDAF)?
作为一个最简单的例子,我想用UDAF替换AVG聚合函数:
pyspark version = 3.0.2
python version = 3.7.10
作为一个最简单的例子,我想用UDAF替换AVG聚合函数:
sc = SparkContext()
sql = SQLContext(sc)
df = sql.createDataFrame(
pd.DataFrame({'id': [1, 1, 2, 2], 'value': [1, 2, 3, 4]}))
df.createTempView('df')
rv = sql.sql('SELECT id, AVG(value) FROM df GROUP BY id').toPandas()
其中rv将是:
In [2]: rv
Out[2]:
id avg(value)
0 1 1.5
1 2 3.5
如何使用UDAF替换查询中的AVG
?
例如,以下语句无法正常工作:
import numpy as np
def udf_avg(x):
return np.mean(x)
sql.udf.register('udf_avg', udf_avg)
rv = sql.sql('SELECT id, udf_avg(value) FROM df GROUP BY id').toPandas()
这个想法是在纯Python中实现一个UDAF,以处理SQL聚合函数不支持的内容(例如低通滤波器)。