在Haskell中计算移动平均值

4
我正在学习Haskell,所以尝试实现一个移动平均函数。以下是我的代码:

mAverage :: Int-> [Int] -> [Float]
mAverage x a = [fromIntegral k / fromIntegral x | k <- rawAverage]
    where
    rawAverage = mAverage' x a a

-- First list contains original values; second list contains moving average computations
mAverage' :: Int -> [Int] -> [Int] -> [Int]
mAverage' 1 a b = b
mAverage' x a b = mAverage' (x - 1) a' b'
    where
    a' = init a
    b' = zipWith (+) a' (tail b)

用户使用mAverage函数时,需要传入每个平均数的长度和值列表(例如mAverage 4 [1,2..100])。

但是,当我在输入mAverage 4 [1,2..100000]时,在ghci中运行代码需要3.6秒,并且使用1GB内存。这对我来说效率非常低下,因为在Python中等效的函数只需要几分之一秒。有没有办法让我的代码更加高效?


3
注意:init a 的时间复杂度为 _O(length a)_,有点昂贵。最好实现滑动窗口,这样将其向前移动一个项的时间是常数时间。 - 9000
1
GHCi不应用于任何与性能相关的事情。我建议使用ghc -O2或者jhc - Thomas M. DuBuisson
2
一种实现滑动窗口的方法是将第一个总和作为“Float”传入,同时传入原始列表(用于从当前总和中减去)以及删除了k个条目的原始列表(用于添加到当前总和)。然后下一个总和是传入的总和减去减法列表的第一个元素再加上加法列表的第一个元素。 - Chai T. Rex
不要编辑问题以使其发生如此根本的变化。 - dfeuer
5个回答

9
如果您想学习一些新知识,可以看看这个优秀的解决方案,它与“移动平均”问题有关。它是由我的一位学生编写的,所以我不会声称自己是作者。我非常喜欢它,因为它非常简短。唯一的问题在于average函数。此类函数已知不好。相反,您可以使用Gabriel Gonzalez的美丽折叠。是的,该函数需要O(k)时间(其中k是窗口大小),用于计算窗口的平均值(我认为这更好,因为如果您尝试仅添加新元素并减去最后一个,则可能会遇到浮点错误)。哦,它还使用了State monad :)
{-# LANGUAGE UnicodeSyntax #-}

module MovingAverage where

import           Control.Monad       (forM)
import           Control.Monad.State (evalState, gets, modify)

moving :: Fractional a ⇒ Int → [a] → [a]
moving n _  | n <= 0 = error "non-positive argument"
moving n xs = evalState (forM xs $ \x → modify ((x:) . take (n-1)) >> gets average) []
  where
    average xs = sum xs / fromIntegral n

为什么这里认为平均函数是“糟糕”的呢?除以0的边界情况吗? - Chris Stryczynski
@ChrisStryczynski 这是一个问题。另一个问题是该函数遍历列表两次:首先找到总和,然后找到“长度”(在Haskell中查找列表的“长度”需要线性时间)。但是,可以在单个列表遍历中计算出总和和长度。 - Shersh

5
这是一个直观的基于列表的解决方案,很符合习惯并且足够快,尽管需要更多的内存。
import Data.List (tails)

mavg :: Fractional b => Int -> [b] -> [b]
mavg k lst = take (length lst-k) $ map average $ tails lst
   where average = (/ fromIntegral k) . sum . take k

这个解决方案允许在移动窗口中使用任何函数来代替average

以下解决方案不那么通用,但空间复杂度恒定且似乎速度最快。

import Data.List (scanl')

mavg :: Fractional b => Int -> [b] -> [b]
mavg k lst = map (/ fromIntegral k) $ scanl' (+) (sum h) $ zipWith (-) t lst
  where (h, t) = splitAt k lst 

最后,这个解决方案使用了一种Okasaki的持久化函数队列来保持移动窗口。当处理流数据时,比如conduits或者pipes,这确实是有意义的。

mavg k lst = map average $ scanl' enq ([], take k lst) $ drop k lst
  where 
    average (l,r) = (sum l + sum r) / fromIntegral k

    enq (l, []) x = enq ([], reverse l) x
    enq (l, (_:r)) x = (x:l, r)

正如在原帖的评论中提到的那样,不要使用ghci进行性能分析。例如,在ghci中无法看到scanl'的任何好处。


1
这里有一个解决方案。
思路是扫描两个列表,一个是平均窗口的起始位置,另一个是结束位置。获取列表的尾部与跳过的部分扫描成本相同,我们不会复制任何东西。(如果窗口大小通常很大,我们可以一次性计算出剩余数据和初始数据的总和。)
我们按照我的评论所述生成部分总和列表,然后将它们除以窗口宽度以获得平均值。
而slidingAverage为偏向位置(向右窗口宽度)计算平均值,centeredSlidingAverage则使用左右各半窗口宽度计算中心平均值。
import Data.List (splitAt, replicate)

slidingAverage :: Int -> [Int] -> [Double] -- window size, source list -> list of averages
slidingAverage w xs = map divide $ initial_sum : slidingSum initial_sum xs remaining_data
  where
    divide = (\n -> (fromIntegral n) / (fromIntegral w))  -- divides the sums by window size
    initial_sum = sum initial_data
    (initial_data, remaining_data) = splitAt w xs

centeredSlidingAverage :: Int -> [Int] -> [Double] -- window size, source list -> list of averages
centeredSlidingAverage w xs = slidingAverage w $ left_padding ++ xs ++ right_padding
  where
    left_padding = replicate half_width 0
    right_padding = replicate (w - half_width) 0
    half_width = (w `quot` 2)   -- quot is integer division

slidingSum :: Int -> [Int] -> [Int] -> [Int] -- window_sum before_window after_window -> list of sums
slidingSum _ _ [] = []
slidingSum window_sum before_window after_window = new_sum : slidingSum new_sum new_before new_after
  where
    value_to_go = head before_window
    new_before = tail before_window
    value_to_come = head after_window
    new_after = tail after_window
    new_sum = window_sum - value_to_go + value_to_come

当我尝试执行length $ slidingAverage 10 [1..1000000]时,我的MBP只需要不到一秒的时间。由于延迟计算,centeredSlidingAverage需要大约相同的时间。

0

一种简单的方法,也使用O(n)复杂度来实现它

movingAverage :: (Fractional a) => Int -> [a] -> [a]
movingAverage n _ | n <= 0 = error "non-positive argument"
movingAverage n xs = fmap average $ groupBy n xs
  where average xs' = sum xs' / fromIntegral (length xs')

groupBy :: Int -> [a] -> [[a]]
groupBy _ [] = []
groupBy n xs = go [] xs
  where
    go _ []      = []
    go l (x:xs') = (x:t) : go (x:l) xs'
      where t = take (n-1) l

0

另一种方法是使用STUArray。

import           Data.Array.Unboxed
import           Data.Array.ST
import           Data.STRef
import           Control.Monad
import           Control.Monad.ST

movingAverage  :: [Double] -> IO [Double]
movingAverage vals = stToIO $ do
  let end = length vals - 1
  myArray <- newArray (1, end) 0 :: ST s (STArray s Int Double)
  forM_ [1 .. end] $ \i -> do
    let cval = vals !! i
    let lval = vals !! (i-1)
    writeArray myArray i ((cval + lval)/2)
  getElems myArray

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接