计算列表的移动平均值

59

这个周末,我决定尝试一下Scala和Clojure。我擅长面向对象编程,所以Scala很容易学习,但想要尝试一下函数式编程。这就是困难所在。

我似乎无法进入编写函数模式的思维状态。作为专业的函数式程序员,您如何解决问题呢?

给定一个值列表和一个定义好的求和期间,您将如何生成新的简单移动平均值列表?

例如:给定列表values(2.0、4.0、7.0、6.0、3.0、8.0、12.0、9.0、4.0、1.0)和period 4,该函数应返回:(0.0、0.0、0.0、4.75、5.0、6.0、7.25、8.0、8.25、6.5)

经过一天的思考,我在Scala中最好的解决方案是:

def simpleMovingAverage(values: List[Double], period: Int): List[Double] = {
  (for (i <- 1 to values.length)
    yield
    if (i < period) 0.00
    else values.slice(i - period, i).reduceLeft(_ + _) / period).toList
}

我知道这种方式极其低效,我更愿意采取以下做法:

where n < period: ma(n) = 0
where n = period: ma(n) = sum(value(1) to value(n)) / period
where n > period: man(n) = ma(n -1) - (value(n-period) / period) + (value(n) / period)

现在用命令式风格实现很容易,但我无论如何都不知道如何以函数式方式表达。

18个回答

53

有趣的问题。我可以想到许多解决方案,效率各不相同。需要重复添加内容并不是真正的性能问题,但让我们假设它是。另外,开头的零可以稍后添加,因此让我们不用担心产生它们。如果算法自然地提供它们,那就好了;如果没有,我们稍后会修正。

从Scala 2.8开始,以下代码将使用sliding获取List的滑动窗口来为n >= period提供结果:

def simpleMovingAverage(values: List[Double], period: Int): List[Double] =
  List.fill(period - 1)(0.0) ::: (values sliding period map (_.sum) map (_ / period))

然而,虽然这种方法相当优雅,但并不是最佳性能,因为它没有利用已经计算过的加法。那么,谈到这些加法,我们如何获取它们呢?

假设我们写下以下内容:

values sliding 2 map sum
我们有一组每两个元素之和的列表。让我们试着使用这个结果来计算4个元素的移动平均值。以上公式进行了如下计算:
from d1, d2, d3, d4, d5, d6, ...
to (d1+d2), (d2+d3), (d3+d4), (d4+d5), (d5+d6), ...

如果我们将每个元素与其下一个元素相加,我们就得到了4个元素的移动平均值:

(d1+d2)+(d3+d4), (d2+d3)+(d4+d5), (d3+d4)+(d5+d6), ...

我们可以这样做:

res zip (res drop 2) map Function.tupled(_+_)

我们可以计算8个元素的移动平均值,以此类推。嗯,有一个众所周知的算法可以计算遵循这种模式的事物。它最为人知的用途是计算数字的幂。算法如下:

def power(n: Int, e: Int): Int = e match {
  case 0 => 1
  case 1 => n
  case 2 => n * n
  case odd if odd % 2 == 1 => power(n, (odd - 1)) * n
  case even => power(power(n, even / 2), 2)
}

因此,让我们在这里应用它:

def movingSum(values: List[Double], period: Int): List[Double] = period match {
  case 0 => throw new IllegalArgumentException
  case 1 => values
  case 2 => values sliding 2 map (_.sum)
  case odd if odd % 2 == 1 => 
    values zip movingSum(values drop 1, (odd - 1)) map Function.tupled(_+_)
  case even =>
    val half = even / 2
    val partialResult = movingSum(values, half)
    partialResult zip (partialResult drop half) map Function.tupled(_+_)
}

这里是逻辑。第0个周期无效,第1个周期等于输入,第2个周期为大小为2的滑动窗口。如果大于此大小,则可能是偶数或奇数。

如果是奇数,我们将每个元素添加到下一个(odd - 1)个元素的movingSum中。例如,如果是3,则将每个元素添加到接下来的2个元素的movingSum中。

如果是偶数,我们计算n / 2movingSum,然后将每个元素添加到之后n / 2步骤上的元素中。

有了这个定义,我们可以回到问题并执行以下操作:

def simpleMovingAverage(values: List[Double], period: Int): List[Double] =
  List.fill(period - 1)(0.0) ::: (movingSum(values, period) map (_ / period))

对于使用:::的效率存在一些小的不足,但它的时间复杂度是O(period),而不是O(values.size)。可以通过尾递归函数使其更加高效。当然,我提供的“滑动”定义在性能方面非常糟糕,但在Scala 2.8中会有一个更好的定义。需要注意的是,我们不能在List上实现高效的sliding方法,但可以在Iterable上实现。

说了这么多,我会选择最初的定义,并且只有在关键路径分析指出这很重要时才进行优化。

总之,让我们来考虑一下我如何解决这个问题。我们有一个移动平均问题。移动平均是列表上移动“窗口”的总和,除以该窗口的大小。因此,首先我尝试获得一个滑动窗口,在其上求和,然后再除以大小。

下一个问题是避免重复计算已经计算过的加法。在这种情况下,我尽可能地去找到最小的加法,并试图弄清如何重用这些结果来计算更大的总和。

最后,让我们按照你想出的方法解决问题,通过从先前的结果中添加和减去值来得到答案。首先的平均值很容易得出:

 def movingAverage(values: List[Double], period: Int): List[Double] = {
   val first = (values take period).sum / period

现在我们创建两个列表。首先是要减去的元素列表,其次是要添加的元素列表:

   val subtract = values map (_ / period)
   val add = subtract drop period

我们可以使用 zip 来将这两个列表相加。这种方法只会生成与较小的列表长度相同的元素,避免了 subtract 过大的问题:

   val addAndSubtract = add zip subtract map Function.tupled(_ - _)

最后,我们通过折叠(fold)将结果组合起来:

   val res = (addAndSubtract.foldLeft(first :: List.fill(period - 1)(0.0)) { 
     (acc, add) => (add + acc.head) :: acc 
   }).reverse

需要返回的答案是什么。整个函数看起来像这样:

 def movingAverage(values: List[Double], period: Int): List[Double] = {
   val first = (values take period).sum / period
   val subtract = values map (_ / period)
   val add = subtract drop period
   val addAndSubtract = add zip subtract map Function.tupled(_ - _)
   val res = (addAndSubtract.foldLeft(first :: List.fill(period - 1)(0.0)) { 
     (acc, add) => (add + acc.head) :: acc 
   }).reverse
   res
 }

3
Daniel,太棒了。我也感谢你解释了思考过程。对我而言,这更多是一次关于优美的函数式编程的练习,而不是找到绝对最高效的方法。你的例子给了我灵感,让我知道这是可行的!非常感谢。 - James P
1
从现在开始,我将称呼您为Sobral教授。这将是一个非常好的讲座主题,特别是您展示了漂亮的逐步转换。非常出色! - user73774
对于Scala 2.9.1,simpleMovingAverage变为:def simpleMovingAverage(values:List [Double],period:Int):List [Double] = List.make(period-1,0.0)++(值滑动期间map(_ sum)map(_ / period))。 List.make(period-1,0.0)返回一个List [Double],而(values sliding period map(_ sum)map(_ / period)返回一个Iterator [Double]。 ++用于连接List和Iterator。 - Brian
@Brian,你会失去之前计算的所有值,并且一直计算“period”元素的总和。我认为你错过了重点。 - Daniel C. Sobral
在Scala 2.9中,“make”已被弃用,而“:::”也无法找到。 - Ivan
1
@Ivan 是的,这个答案是为 Scala 2.7 版本编写的。我现在已经重写了它,以适应现代 Scala 版本。可能仍有一些问题,但最终代码至少应该能正常工作。 - Daniel C. Sobral

29
我比较擅长Clojure,所以我来回答。就我写这篇文章的时候而言,其他Clojure的条目都是命令式的;这不是你想要的(也不符合Clojure的习惯用法)。我首先想到的算法是重复从序列中取出所需数量的元素,删除第一个元素,然后递归执行。
以下方法适用于任何类型的序列(向量或列表,惰性或非惰性),并生成一个惰性序列的平均值——如果你正在处理一个大小不确定的列表,则可能会有帮助。请注意,如果列表中没有足够的元素进行消耗,它通过隐式返回nil来处理基本情况。
(defn moving-average [values period]
  (let [first (take period values)]
    (if (= (count first) period)
      (lazy-seq 
        (cons (/ (reduce + first) period)
              (moving-average (rest values) period))))))

在您的测试数据上运行此代码将产生以下结果:

user> (moving-average '(2.0, 4.0, 7.0, 6.0, 3.0, 8.0, 12.0, 9.0, 4.0, 1.0) 4)
(4.75 5.0 6.0 7.25 8.0 8.25 6.5)

虽然这个序列的前几个元素没有给出"0",但这可以轻松地(有点不自然地)处理。

最简单的方法是看到模式,并能够想起符合要求的可用函数。 partition 可以给出序列部分的延迟视图,然后我们可以对其进行映射:

(defn moving-average [values period]
  (map #(/ (reduce + %) period) (partition period 1 values))

有人要求一个尾递归版本;尾递归和惰性的区别是一种权衡。当你的任务是构建一个列表时,将函数变为尾递归通常相当简单,这个例子也不例外——只需将列表作为参数累积到子函数中。我们将累积到向量而不是列表,因为否则列表将被反向构建并需要在最后进行反转。

(defn moving-average [values period]
  (loop [values values, period period, acc []]
    (let [first (take period values)]
      (if (= (count first) period)
        (recur (rest values) period (conj acc (/ (reduce + first) period)))
        acc))))

loop 是一种创建匿名内部函数的方法(有点像Scheme语言的命名let);recur 必须在Clojure中使用以消除尾调用。 conj 是一个广义的 cons,以集合的自然方式添加元素,对于列表是在开头添加,在向量末尾添加。


1
+1 递归解决方案;现在将其变为尾递归。;-) - Daniel C. Sobral
5
懒惰的好处在于,如果你没有给懒惰序列命名,它不会占用栈空间,前面的值会自动清除。(至少我是这样理解的。) - James Cunningham
谢谢James,这正是我在寻找的东西。简单、优雅且易于阅读。 - James P
1
@James Cunningham:Clojure默认是惰性求值的吗?我不知道。知道这个很好。 - Daniel C. Sobral
@Daniel:默认情况下它不是惰性的;这就是为什么我不得不在调用cons时使用lazy-seq宏进行包装。然而,大多数操作序列的函数都是惰性的。 - James Cunningham
哦,该死,我完全忘了! - Daniel C. Sobral

15

这里是另一个(功能性的)Clojure解决方案:

(defn avarage [coll]
  (/ (reduce + coll)
     (count coll)))
(defn ma [period coll] (map avarage (partition period 1 coll)))

如果有必要,仍然需要在序列开头添加零。


2
给分区函数一个第三个参数 (repeat 0),以提供缺失的参数到结尾,如果你想要它们被包含进去。 - Arthur Ulfeldt
为了在开头得到零,您可以像这样连接它们:`(defn ma [period coll] (lazy-cat (repeat period 0) (map avarage (partition period 1 (repeat 0) coll))))` - Brad Lucas

14

以下是Clojure中的一个纯函数解决方案。相对于已提供的更为复杂,但它是延迟计算仅在每个步骤调整平均值,而不是从头开始重新计算。如果周期很短,则比在每个步骤计算新平均值的简单解决方案慢;然而,对于较长期间,它几乎没有减速,而执行(/ (take period ...) period)的某些内容会在较长期间表现得更差。

(defn moving-average
  "Calculates the moving average of values with the given period.
  Returns a lazy seq, works with infinite input sequences.
  Does not include initial zeros in the output."
  [period values]
  (let [gen (fn gen [last-sum values-old values-new]
              (if (empty? values-new)
                nil
                (let [num-out (first values-old)
                      num-in  (first values-new)
                      new-sum (+ last-sum (- num-out) num-in)]
                  (lazy-seq
                    (cons new-sum
                          (gen new-sum
                               (next values-old)
                               (next values-new)))))))]
    (if (< (count (take period values)) period)
      nil
      (map #(/ % period)
           (gen (apply + (take (dec period) values))
                (cons 0 values)
                (drop (dec period) values))))))

我决定添加到这个旧问题中,因为主题再次出现了(https://dev59.com/zEzSa4cB1Zd3GeqPlUpg),而且我发现指向这个不错的可能解决方案集合更可取,同时加入我的见解(与Clojure中以前的版本不同,如A中所述)。也许我们可以建立Web上最完整的函数移动平均实现库!;-) - Michał Marczyk

9

这里是一个部分使用 point-free 的一行 Haskell 解决方案:

ma p = reverse . map ((/ (fromIntegral p)) . sum . take p) . (drop p) . reverse . tails

首先,它将 tails 应用于列表以获取“tails”列表,因此:

Prelude List> tails [2.0, 4.0, 7.0, 6.0, 3.0]
[[2.0,4.0,7.0,6.0,3.0],[4.0,7.0,6.0,3.0],[7.0,6.0,3.0],[6.0,3.0],[3.0],[]]

将其反转并删除前面的 'p' 个条目(这里以2为p):
Prelude List> (drop 2 . reverse . tails) [2.0, 4.0, 7.0, 6.0, 3.0]
[[6.0,3.0],[7.0,6.0,3.0],[4.0,7.0,6.0,3.0],[2.0,4.0,7.0,6.0,3.0]]

如果您不熟悉(.)点/乳头符号,它是“函数合成”的运算符,意味着它将一个函数的输出作为另一个函数的输入进行传递,将它们“组合”成一个单一的函数。 (g . f) 的意思是“对一个值运行f,然后将输出传递给g”,因此 ((f . g) x) 与 (g(f x)) 相同。通常使用这个符号可以带来更清晰的编程风格。

然后,它将函数 ((/ (fromIntegral p)) . sum . take p) 映射到列表上。因此,对于列表中的每个列表,它取前 'p' 个元素,求和,然后除以 'p'。然后我们用 "reverse" 翻转列表即可。

Prelude List> map ((/ (fromIntegral 2)) . sum . take 2) [[6.0,3.0],[7.0,6.0,3.0]
,[4.0,7.0,6.0,3.0],[2.0,4.0,7.0,6.0,3.0]]
[4.5,6.5,5.5,3.0]

这看起来比实际情况要低效得多;“reverse”在列表被评估之前并不会物理上反转列表的顺序,它只是将其放置到堆栈上(好老的惰性Haskell)。 “tails”也不会创建所有这些单独的列表,它只是引用原始列表的不同部分。 它仍然不是一个很好的解决方案,但只有一行 :)

这里有一个稍微好一点但更长的解决方案,它使用mapAccum进行滑动减法和加法:

ma p l = snd $ mapAccumL ma' a l'
    where
        (h, t) = splitAt p l
        a = sum h
        l' = (0, 0) : (zip l t)
        ma' s (x, y) = let s' = (s - x) + y in (s', s' / (fromIntegral p))

首先,我们将列表在“p”处分成两部分:

Prelude List> splitAt 2 [2.0, 4.0, 7.0, 6.0, 3.0]
([2.0,4.0],[7.0,6.0,3.0])

求第一位的和:

Prelude List> sum [2.0, 4.0]
6.0

将第二个位与原始列表压缩在一起(这只是按顺序配对两个列表中的项目)。原始列表显然更长,但我们失去了这个额外的位:

Prelude List> zip [2.0, 4.0, 7.0, 6.0, 3.0] [7.0,6.0,3.0]
[(2.0,7.0),(4.0,6.0),(7.0,3.0)]

现在我们为mapAccum(ulator)定义一个函数。mapAccumL与“map”相同,但具有额外的运行状态/累加器参数,该参数从前一个“映射”传递到下一个映射,因为map通过列表。我们使用累加器作为我们的移动平均值,并且由刚刚离开滑动窗口的元素和刚刚进入它的元素(我们刚刚压缩的列表)组成的列表中,我们的滑动函数将第一个数字'x'从平均值中减去并添加第二个数字'y'。然后,我们将新的's'传递并返回's'除以'p'。"snd"(第二个)只取一对(元组)的第二个成员,用于获取mapAccumL的第二个返回值,因为mapAccumL将返回累加器以及映射的列表。
对于那些不熟悉$符号的人,它是“应用程序运算符”。 它实际上并没有做什么,但它具有“低,右结合绑定优先级”,这意味着您可以省略括号(请注意LISPers),即(f x)与f $ x相同。
运行(ma 4 [2.0, 4.0, 7.0, 6.0, 3.0, 8.0, 12.0, 9.0, 4.0, 1.0])会产生任一解决方案的[4.75, 5.0, 6.0, 7.25, 8.0, 8.25, 6.5]。
哦,你需要导入模块“List”才能编译任一解决方案。

7
这里有两种在Scala 2.8.0中进行移动平均的方法(一种是严格的,另一种是惰性的)。两种方法都假设vs中至少有p个双精度数。
// strict moving average
def sma(vs: List[Double], p: Int): List[Double] =
  ((vs.take(p).sum / p :: List.fill(p - 1)(0.0), vs) /: vs.drop(p)) {(a, v) =>
    ((a._1.head - a._2.head / p + v / p) :: a._1, a._2.tail)
  }._1.reverse

// lazy moving average
def lma(vs: Stream[Double], p: Int): Stream[Double] = {
  def _lma(a: => Double, vs1: Stream[Double], vs2: Stream[Double]): Stream[Double] = {
    val _a = a // caches value of a
    _a #:: _lma(_a - vs2.head / p + vs1.head / p, vs1.tail, vs2.tail)
  }
  Stream.fill(p - 1)(0.0) #::: _lma(vs.take(p).sum / p, vs.drop(p), vs)
}

scala> sma(List(2.0, 4.0, 7.0, 6.0, 3.0, 8.0, 12.0, 9.0, 4.0, 1.0), 4)
res29: List[Double] = List(0.0, 0.0, 0.0, 4.75, 5.0, 6.0, 7.25, 8.0, 8.25, 6.5)

scala> lma(Stream(2.0, 4.0, 7.0, 6.0, 3.0, 8.0, 12.0, 9.0, 4.0, 1.0), 4).take(10).force
res30: scala.collection.immutable.Stream[Double] = Stream(0.0, 0.0, 0.0, 4.75, 5.0, 6.0, 7.25, 8.0, 8.25, 6.5)

这次自动格式化真的把代码搞砸了。对于任何阅读它的人来说,“#”不是注释标记,而是操作符“#::”和“#:::”的一部分,它们是流等效的“::”和“:::”。 - Daniel C. Sobral
2
小伙子,这段代码写得不错!使用元组同时减少一个列表并增加另一个列表的方法非常聪明。但是请解释一下你在做什么,以使答案更有用。 - Daniel C. Sobral
1
@Daniel 谢谢!编写代码比解释代码要容易得多;-) 你已经描述了它的要点。两个列表/流在两个函数中都被维护,并且在每次迭代中都会取出它们的“头部”。一个列表/流用作主要集合进行迭代,而另一个列表/流(与之相同的集合,只是减去了“周期”较少的双数)用于计算新的移动平均值。 - Walter Chang

6

J编程语言可以实现移动平均等程序。事实上,(+/ % #)\这个表达式的字符数比它的标签'moving average.'还要少。

对于此问题中指定的值(包括名称“values”),下面是一种简单的编码方式:

   values=: 2 4 7 6 3 8 12 9 4 1
   4 (+/ % #)\ values
4.75 5 6 7.25 8 8.25 6.5

我们可以通过为组件使用标签来描述这个问题。
   periods=: 4
   average=: +/ % #
   moving=: \

   periods average moving values
4.75 5 6 7.25 8 8.25 6.5

这两个例子使用的是完全相同的程序,唯一的区别是第二个表格中使用了更多的名称。这些名称可以帮助不熟悉J语言基础知识的读者。

让我们稍微深入一下子程序average+/表示求和(Σ),%表示除法(就像经典符号÷)。通过#计算项目的计数。因此,整个程序是值的总和除以值的计数:+/ % #

这里编写的移动平均值计算结果不包括原问题中预期的前导零。这些零被认为不是预期计算的一部分。

这里使用的技术称为暗示编程。它与函数式编程的无点风格非常相似。


5

这里是 Clojure 假装成更加函数式的语言。顺便提一下,这段代码完全支持尾递归,并包含前导零。

(defn moving-average [period values]
  (loop [[x & xs]  values
         window    []
         ys        []]

    (if (and (nil? x) (nil? xs))
      ;; base case
      ys

      ;; inductive case
      (if (< (count window) (dec period))
        (recur xs (conj window x) (conj ys 0.0))
        (recur xs
               (conj (vec (rest window)) x)
               (conj ys (/ (reduce + x window) period)))))))

(deftest test-moving-average
  (is (= [0.0 0.0 0.0 4.75 5.0 6.0 7.25 8.0 8.25 6.5]
         (moving-average 4 [2.0 4.0 7.0 6.0 3.0 8.0 12.0 9.0 4.0 1.0]))))

通常我会把集合或列表参数放在最后,以便更容易地对函数进行柯里化。但是在Clojure中...
(partial moving-average 4)

...非常繁琐,我通常最终会这样做...


#(moving-average 4 %)

在这种情况下,参数的顺序并不重要。


嗨,乔纳森,我对这种函数式编程还不太熟悉,你能否解释一下这是如何进行尾递归的?谢谢。 - James P
2
递归发生在if语句中,其中每个选项都基于recur。这将首先计算每个参数,然后才进行递归。答案将是recur的结果。由于结果与递归返回的相同,没有其他计算,因此这是尾递归。 - Daniel C. Sobral
正如丹尼尔所说,在每次“recur”调用返回后,没有任何任务需要执行。这个“堆栈帧”不再需要,而“循环”变量可以重新绑定。“recur”是Clojure中的一个特殊构造;编译器实际上会检查它是否处于尾部位置。 - Jonathan Tran

3

以下是Clojure版本:

由于使用了lazy-seq,它非常通用且不会导致堆栈溢出

(defn partialsums [start lst]
  (lazy-seq
    (if-let [lst (seq lst)] 
          (cons start (partialsums (+ start (first lst)) (rest lst)))
          (list start))))

(defn sliding-window-moving-average [window lst]
  (map #(/ % window)
       (let [start   (apply + (take window lst))
             diffseq (map   - (drop window lst) lst)]
         (partialsums start diffseq))))

为了帮助您理解它正在做什么:

(sliding-window-moving-average 5 '(1 2 3 4 5 6 7 8 9 10 11))

start = (+ 1 2 3 4 5) = 15

diffseq = - (6 7 8 9 10 11)
            (1 2 3 4  5  6 7 8 9 10 11)

        =   (5 5 5 5  5  5)

(partialsums 15 '(5 5 5 5 5 5) ) = (15 20 25 30 35 40 45)

(map #(/ % 5) (20 25 30 35 40 45)) = (3 4 5 6 7 8 9)

;; 示例

(take 20 (sliding-window-moving-average 5 (iterate inc 0)))

2
一段简短的Clojure代码,具有O(列表长度)的优点,无论您的周期如何:
(defn moving-average [list period]
  (let [accums (let [acc (atom 0)] (map #(do (reset! acc (+ @acc %1 ))) (cons 0 list)))
        zeros (repeat (dec period) 0)]
     (concat zeros (map #(/ (- %1 %2) period) (drop period accums) accums))))

这利用了一个事实,即通过创建序列的累积和(例如 [1 2 3 4 5] -> [0 1 3 6 10 15]),您可以计算一系列数字的总和,然后减去两个具有等于您周期的偏移量的数字。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接