在Haskell中实现高效的滑动窗口算法

16

我需要一个高效的 Haskell 滑动窗口函数,所以我写了下面的代码:

windows n xz@(x:xs)
  | length v < n = []
  | otherwise = v : windows n xs
  where
    v = take n xz

我的问题在于,我认为这个算法的复杂度是O(n*m),其中m是列表的长度,n是窗口大小。你先遍历一次列表用于take,再遍历一次列表用于length,最后还要遍历m-n次列表。看起来应该有更加高效的方法,但我不知道如何使它变得更线性。有人愿意尝试吗?

5个回答

10

由于输出数据结构的大小是O(m*n),因此你无法得到更好的结果。

但如果您反转操作顺序,则可以避免检查窗口的长度:首先创建n个移位列表,然后将它们一起压缩。压缩将自动消除那些没有足够元素的列表。

import Control.Applicative
import Data.Traversable (sequenceA)
import Data.List (tails)

transpose' :: [[a]] -> [[a]]
transpose' = getZipList . sequenceA . map ZipList

将列表的列表压缩只是转置,但与Data.List中的transpose不同,它会丢弃少于n个元素的输出。
现在很容易制作窗口函数:取m个列表,每个列表向右移动1个位置,然后将它们压缩即可:
windows :: Int -> [a] -> [[a]]
windows m = transpose' . take m . tails

也适用于无限列表。


11
foldr (zipWith (:)) (repeat []) . take m . tails。 意思是将一个列表分成长度为m的子列表,并将每个子列表转置成一个新的列表,其中使用了函数式编程中的高阶函数 foldrzipWithtails - Will Ness
@Will Ness - 哦,那很好。 - user1441998
@user1441998,这就是ZipList上的sequenceA所做的事情。 :) (我所说的“或”是指“或者可以明确地写成...”)。sequenceA == foldr ((<*>).((:)<$>)) (pure []) - Will Ness
@Will Ness -- 感谢您的澄清,甚至更好! - user1441998

7
您可以使用位于 Data.Sequence 模块中的 Seq,其在两端都具有 O(1) 的入队和出队操作能力:
import Data.Foldable (toList)
import qualified Data.Sequence as Seq
import Data.Sequence ((|>))

windows :: Int -> [a] -> [[a]]
windows n0 = go 0 Seq.empty
  where
    go n s (a:as) | n' <  n0   =              go n' s'  as
                  | n' == n0   = toList s'  : go n' s'  as
                  | otherwise =  toList s'' : go n  s'' as
      where
        n'  = n + 1         -- O(1)
        s'  = s |> a        -- O(1)
        s'' = Seq.drop 1 s' -- O(1)
    go _ _ [] = []

请注意,如果您将整个结果实现,那么您的算法必然是O(N*M),因为这是您的结果的大小。使用Seq只是通过一个常量因子提高了性能。
示例用法:
>>> windows [1..5]
[[1,2,3],[2,3,4],[3,4,5]]

1
在我看来,Data.Sequence 太过复杂了。你只需要一个队列!像银行家队列这样的轻量级数据结构应该能很好地解决问题。 - dfeuer

5

首先让我们不用担心结尾处的短窗口,先看看其他的窗口:

import Data.List (tails)

windows' :: Int -> [a] -> [[a]]
windows' n = map (take n) . tails

> windows' 3 [1..5]
[[1,2,3],[2,3,4],[3,4,5],[4,5],[5],[]]

现在我们想要摆脱长度较短的元素,而不用检查每一个元素的长度。

因为我们知道它们在末尾,所以可以这样去除:

windows n xs = take (length xs - n + 1) (windows' n xs)

但这并不好,因为我们仍然需要额外遍历一次xs来获取其长度。它也无法处理无限列表,而你原来的解决方案可以。

相反,我们可以编写一个函数,使用一个列表作为标尺来测量从另一个列表中取出的数量:

takeLengthOf :: [a] -> [b] -> [b]
takeLengthOf = zipWith (flip const)

> takeLengthOf ["elements", "get", "ignored"] [1..10]
[1,2,3]

现在我们可以写成这样:
windows :: Int -> [a] -> [[a]]
windows n xs = takeLengthOf (drop (n-1) xs) (windows' n xs)

> windows 3 [1..5]
[[1,2,3],[2,3,4],[3,4,5]]

同样适用于无限列表:

> take 5 (windows 3 [1..])
[[1,2,3],[2,3,4],[3,4,5],[4,5,6],[5,6,7]]

正如Gabriella Gonzalez所说,如果你想使用整个结果,时间复杂度并不会更好。但是如果你只使用其中的一些窗口,我们现在成功避免了对未使用的窗口执行takelength操作。


2
如果你想要O(1)长度,为什么不使用提供O(1)长度的结构?假设你不是在无限列表中查找窗口,考虑使用:
import qualified Data.Vector as V
import Data.Vector (Vector)
import Data.List(unfoldr) 

windows :: Int -> [a] -> [[a]]
windows n = map V.toList . unfoldr go . V.fromList
 where                    
  go xs | V.length xs < n = Nothing
        | otherwise =
            let (a,b) = V.splitAt n xs
            in Just (a,b)

将每个窗口的对话从向量转换为列表可能会让你感到困惑,我不敢乐观地猜测,但我敢打赌性能比仅使用列表的版本更好。


我愿意打赌这是低效的。流融合并不能让你避免实现向量(通过数组加倍)。像David Fletcher或(我认为)Petr Pudlák的解决方案看起来更有前途。Gabriel Gonzalez的写法可能比那些慢,但使用更便宜的队列可能会稍微快一点(基于重新实现“inits”的经验而猜测)。 - dfeuer
最后一行应该是“只有(a,V.tail xs)”,或者直接使用“slice”而不是“splitAt”。然后建议将切片保留为它们本身,而不将其转换为列表,以节省线性空间。 - Will Ness

0

对于滑动窗口,我也使用了未封装的向量作为长度、take、drop和splitAt都是O(1)操作。

Thomas M. DuBuisson的代码是一个按n位移的窗口,不是滑动窗口,除非n=1。因此缺少了(++),但这需要O(n+m)的代价。因此请注意放置的位置。

 import qualified Data.Vector.Unboxed as V
 import Data.Vector.Unboxed (Vector)
 import Data.List

 windows :: Int -> Vector Double -> [[Int]]
 windows n = (unfoldr go) 
  where                    
   go !xs | V.length xs < n = Nothing
          | otherwise =
             let (a,b) = V.splitAt 1 xs
                  c= (V.toList a ++V.toList (V.take (n-1) b))
             in (c,b)

我使用 +RTS -sstderr 进行了尝试并得到了如下结果:

 putStrLn $ show (L.sum $ L.concat $  windows 10 (U.fromList $ [1..1000000]))

考虑到在滑动窗口之后执行了两个O(m)操作,我们得到了实时1.051秒和96.9%的使用率。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接