如何在Pyspark中运行指数加权移动平均?

3

我试图在PySpark中使用Grouped Map Pandas UDF运行指数加权移动平均。但是它似乎不起作用:

def ExpMA(myData):

    from pyspark.sql.functions import pandas_udf
    from pyspark.sql.functions import PandasUDFType
    from pyspark.sql import SQLContext 

    df = myData
    group_col = 'Name'
    sort_col = 'Date'

    schema = df.select(group_col, sort_col,'count').schema
    print(schema)

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ema(pdf):
        Model = pd.DataFrame(pdf.apply(lambda x: x['count'].ewm(span=5, min_periods=1).mean()))
        return Model

    data = df.groupby('Name').apply(ema)

    return data

我也尝试在PySpark中直接编写ewma方程,而不是使用Pandas udf,但问题在于ewma方程包含当前ewma的滞后。

1个回答

3

首先,您的Pandas代码是不正确的。无论使用Spark还是不使用,这都行不通。

pdf.apply(lambda x: x['count'].ewm(span=5, min_periods=1).mean())

另一个问题是输出架构,根据您的数据,可能无法真正适应结果:
  • 如果想要添加 ewm 架构应该进行扩展。
  • 如果只想返回 ewm ,那么架构会太大。
  • 如果只想替换,可能不符合类型。
假设这是第一种情况(我允许自己稍微重写了您的代码):
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import DoubleType, StructField

def exp_ma(df, group_col='Name', sort_col='Date'):
    schema = (df.select(group_col, sort_col, 'count')
        .schema.add(StructField('ewma', DoubleType())))

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def ema(pdf):
        pdf['ewm'] = pdf['count'].ewm(span=5, min_periods=1).mean()
        return pdf

    return df.groupby('Name').apply(ema)

df = spark.createDataFrame(
    [("a", 1, 1), ("a", 2, 3), ("a", 3, 3), ("b", 1, 10), ("b", 8, 3), ("b", 9, 0)], 
    ("name", "date", "count")
)

exp_ma(df).show()
# +----+----+-----+------------------+                                            
# |Name|Date|count|              ewma|
# +----+----+-----+------------------+
# |   b|   1|   10|              10.0|
# |   b|   8|    3| 5.800000000000001|
# |   b|   9|    0|3.0526315789473686|
# |   a|   1|    1|               1.0|
# |   a|   2|    3|               2.2|
# |   a|   3|    3| 2.578947368421052|
# +----+----+-----+------------------+

我不经常使用Pandas,因此可能有更加优雅的方法来解决这个问题。


代码显示Spark DataFrame没有apply函数。 - Landmaster

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接