Spark指数移动平均

4

我有一个时间序列价格数据的数据框,包含ID、日期和价格。

我需要计算价格列的指数移动平均值,并将其作为新列添加到数据框中。

我之前使用过Spark的窗口函数,看起来很适合这种情况,但考虑到EMA的公式:

EMA: {Price - EMA(previous day)} x multiplier + EMA(previous day)

where

multiplier = (2 / (Time periods + 1)) //let's assume Time period is 10 days for now

我有点困惑,如何在对列进行窗口处理时访问先前计算的值。 对于简单的移动平均值,这很简单,只需要在窗口内平均元素即可计算出新的列:

var window = Window.partitionBy("ID").orderBy("Date").rowsBetween(-windowSize, Window.currentRow)
dataFrame.withColumn(avg(col("Price")).over(window).alias("SMA"))

但是似乎使用EMA有点复杂,因为每一步都需要前一个计算出的值。

我也看过Pyspark中加权移动平均,但我需要适用于Spark/Scala,并且适用于10或30天的EMA的方法。

有什么想法吗?

1个回答

7
最后,我分析了指数移动平均在pandas数据帧中的实现方式。除了我上面描述的递归公式外,在任何SQL或窗口函数中实现它都很困难(因为它是递归的),还有另一种方法,在他们的问题跟踪器上有详细说明:
y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
       ((1-a)^0 + (1-a)^1 + (1-a)^2 + ... + (1-a)^n).

鉴于此,再加上这里提供的Spark实现帮助,我最终得到了以下实现方式,它与执行pandas_dataframe.ewm(span=window_size).mean()基本相当。

def exponentialMovingAverage(partitionColumn: String, orderColumn: String, column: String, windowSize: Int): DataFrame = {
  val window = Window.partitionBy(partitionColumn)
  val exponentialMovingAveragePrefix = "_EMA_"

  val emaUDF = udf((rowNumber: Int, columnPartitionValues: Seq[Double]) => {
    val alpha = 2.0 / (windowSize + 1)
    val adjustedWeights = (0 until rowNumber + 1).foldLeft(new Array[Double](rowNumber + 1)) { (accumulator, index) =>
      accumulator(index) = pow(1 - alpha, rowNumber - index); accumulator
    }
    (adjustedWeights, columnPartitionValues.slice(0, rowNumber + 1)).zipped.map(_ * _).sum / adjustedWeights.sum
  })
  dataFrame.withColumn("row_nr", row_number().over(window.orderBy(orderColumn)) - lit(1))
    .withColumn(s"$column$exponentialMovingAveragePrefix$windowSize", emaUDF(col("row_nr"), collect_list(column).over(window)))
    .drop("row_nr")
}

(我假设需要计算指数移动平均的列的类型为Double。)
希望这可以帮助其他人。

1
你能帮我使用Scala调用这个函数吗?因为我遇到了一些与数据不匹配有关的错误。我正在发送列并期望得到字符串? - Mahesh Gupta

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接