在NumPy数组中的滑动窗口中找到最大值

18

我想创建一个数组,它包含给定 numpy 数组的移动窗口中的所有 max() 值。如果这听起来令人困惑,请看以下例子。输入:

[ 6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ]

我的输出窗口宽度为5时应该是这样的:

[     8,8,8,7,7,7,7,7,7,6,6,6,6,6,6,7,7,9,9,9,9     ]

每个数字都应该是输入数组的宽度为5的子数组的最大值:

[ 6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ]
  \       /                 \       /
   \     /                   \     /
    \   /                     \   /
     \ /                       \ /
[     8,8,8,7,7,7,7,7,7,6,6,6,6,6,6,7,7,9,9,9,9     ]

在NumPy中,我没有找到一个现成的函数可以做到这一点(但如果有的话,我也不会感到惊讶;毕竟我并不总是按照NumPy开发人员的思路来思考)。我考虑创建一个输入数据的2D移位版本:

[ [ 6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1 ]
  [ 4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9 ]
  [ 8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4 ]
  [ 7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3 ]
  [ 1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ] ]

我可以对此应用np.max(input, 0),然后得到我的结果。但在我的情况下,这似乎并不高效,因为我的数组和窗口宽度都可能很大(>1000000个条目和>100000个窗口宽度)。数据将会按窗口宽度多多少少地被放大。

我也考虑过以某种方式使用np.convolve(),但无法找到实现目标的方法。

有什么好的方法可以高效地完成这个任务吗?

6个回答

15

Pandas提供了适用于Series和DataFrames的rolling方法,这可能在这里非常有用:

import pandas as pd

lst = [6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2]
lst1 = pd.Series(lst).rolling(5).max().dropna().tolist()

# [8.0, 8.0, 8.0, 7.0, 7.0, 8.0, 8.0, 8.0, 8.0, 8.0, 6.0, 6.0, 6.0, 6.0, 6.0, 7.0, 7.0, 9.0, 9.0, 9.0, 9.0]

为了保持一致性,您可以将lst1的每个元素强制转换为int

[int(x) for x in lst1]

# [8, 8, 8, 7, 7, 8, 8, 8, 8, 8, 6, 6, 6, 6, 6, 7, 7, 9, 9, 9, 9]

我发现你可以用更简单的方式重构你的解决方案:a = np.array(…), pd.rolling_max(a, window=5)。到目前为止,这似乎是我处理的最佳选项。@Divakar的strides解决方案如果适用于我的尺寸,将会更快,因此在接受这个答案之前我还在等待。 - Alfe
2
Pandas的新版本告诉我,我的缩写将来不再受支持,所以你的解决方案是最好的。 - Alfe

11

方法一:您可以使用Scipy中的1D max filter

from scipy.ndimage.filters import maximum_filter1d

def max_filter1d_valid(a, W):
    hW = (W-1)//2 # Half window size
    return maximum_filter1d(a,size=W)[hW:-hW]

方案 #2 : 这里有另一种方法,使用stridesstrided_app可以相当高效地创建一个视图,将数组转换为移位后的2D版本,这样我们就可以在第二个轴上使用任何自定义归约操作-

def max_filter1d_valid_strided(a, W):
    return strided_app(a, W, S=1).max(axis=1)

运行时测试 -

In [55]: a = np.random.randint(0,10,(10000))

# @Abdou's solution using pandas rolling
In [56]: %timeit pd.Series(a).rolling(5).max().dropna().tolist()
1000 loops, best of 3: 999 µs per loop

In [57]: %timeit max_filter1d_valid(a, W=5)
    ...: %timeit max_filter1d_valid_strided(a, W=5)
    ...: 
10000 loops, best of 3: 90.5 µs per loop
10000 loops, best of 3: 87.9 µs per loop

这听起来非常有前途,将性能与“pandas”解决方案进行比较。不幸的是,对于我处理的数组,这会引发“ValueError:array is too big.”。请自行尝试:“a = np.arange(1000000)”,“np.lib.stride_tricks.as_strided(a,shape =(1000,len(a)-1000 +1),strides =(a.strides [0],a.strides [0]))”。实际上,我需要在大小为10m及以上的数组中使用100k大小的窗口。你有什么解决方法吗? - Alfe
@Alfe 只需使用他提供的 scipy.ndimage.maximum_filter1d 方法。它几乎和原来的方法一样快,即使对于巨大的数组也应该非常高效。 - MSeifert
@MSeifert 很不幸,与pandas的rolling_max()相比,它速度较慢。在我的测试中,当数据大小接近实际大小的下限时,速度大约慢了两倍。 - Alfe
这很有趣,因为在我的电脑上,当窗口大小为100k,数组大小为10m时,maximum_filter1d比其他函数快3-4倍。你是否使用了两个软件包的最新版本? - MSeifert
@Divakar 我正在使用numpy 1.6.1版本。我在另一台安装了1.8.2版本的机器上尝试过,没有问题地评估了as_strides项。但是,在对其调用.max(0)时,根据我尝试较小值所估算的计算时间将会非常长(对于10Mio中的window=100000,在我的机器上大约需要>1500秒)。 - Alfe
显示剩余3条评论

5
我已经尝试了几种变体,现在宣布Pandas版本是性能比赛的胜利者。我尝试了几种变体,甚至使用二叉树(用纯Python实现)快速计算任意子范围的最大值。(可要求源代码)。我自己想出来的最好的算法是使用环形缓冲区的普通滚动窗口; 如果当前的最大值没有在此迭代中从中删除,则只需要完全重新计算那个最大值; 否则,它将保持不变或增加到下一个新值。与旧库相比,这个纯Python实现比其他库都要快。
最终我发现所涉及的库的版本非常重要。我主要使用的旧版本比现代版本慢得多。以下是使用100k大小的窗口对1M个数字进行rollingMax处理的数字:
         old (slow HW)           new (better HW)
scipy:   0.9.0:  21.2987391949   0.13.3:  11.5804400444
pandas:  0.7.0:  13.5896410942   0.18.1:   0.0551438331604
numpy:   1.6.1:   1.17417216301  1.8.2:    0.537392139435

这是使用环形缓冲区实现纯numpy版本的代码:

def rollingMax(a, window):
  def eachValue():
    w = a[:window].copy()
    m = w.max()
    yield m
    i = 0
    j = window
    while j < len(a):
      oldValue = w[i]
      newValue = w[i] = a[j]
      if newValue > m:
        m = newValue
      elif oldValue == m:
        m = w.max()
      yield m
      i = (i + 1) % window
      j += 1
  return np.array(list(eachValue()))

对于我的输入来说,这很适用,因为我处理的是各个方向上都有大量峰值的音频数据。如果你将一个不断减少的信号(例如-np.arange(10000000))输入进去,那么你会面临最坏情况(也许在这种情况下你应该反转输入和输出)。

我只是提供这个信息,以防有人想在一台带有旧库的机器上执行此任务。


4

Numpy 1.20 开始,sliding_window_view 提供了一种通过滑动/滚动窗口来查找元素最大值的方法:

from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([6,4,8,7,1,4,3,5,7,2,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2])
np.max(sliding_window_view(values, window_shape = 5), axis = 1)
# array([8, 8, 8, 7, 7, 7, 7, 7, 7, 6, 6, 6, 6, 6, 6, 7, 7, 9, 9, 9, 9])

其中:

  • window_shape 是滑动窗口的大小
  • np.max(array, axis = 1) 在每个子数组中寻找最大值

而滑动过程的中间结果为:

sliding_window_view(values, window_shape = 5)
# array([[6, 4, 8, 7, 1],
#        [4, 8, 7, 1, 4],
#        [8, 7, 1, 4, 3],
#        ...
#        [7, 1, 9, 4, 3],
#        [1, 9, 4, 3, 2]])

1
首先,我认为您的解释有误,因为在您的解释开始时,初始输入数组的第10个元素等于8,在下面应用窗口的地方是2。
纠正后,我认为实现您想要的代码如下:
import numpy as np
a=np.array([ 6,4,8,7,1,4,3,5,7,8,4,6,2,1,3,5,6,3,4,7,1,9,4,3,2 ])
window=5
for i in range(0,len(a)-window,1): 
    b[i] = np.amax(a[i:i+window])

我认为,这种方法比创建输入的2D偏移版本更好,因为当你创建这样的版本时,需要使用比使用原始输入数组更多的内存,所以如果输入很大,你可能会耗尽内存。

天啊,你说得对!我在写问题的过程中改变了我的输入,以展示更多的情况。我没有保持一致。我已经修复了这个问题。关于你的建议:我想避免使用Python编写循环来处理我的输入,因为那总是比使用numpyscipypandas等包的任何功能慢。如果你认为你的解决方案可以竞争,请提供时间测试。否则:当然,这是一个简单而好的解决方案。只是它不能满足我的性能期望。 - Alfe

0
如果您有二维数据,例如股票价格,并想要获取滚动的最大值或其他计算,这个方法可以帮助您。该方法不需要使用迭代来进行计算。
n = 5  # size of rolling window

data_expanded = np.expand_dims(data, 1)
data_shift = [np.roll(data_expanded, shift=-i, axis=2) for i in range(n)]
data_shift = np.concatenate(data_shift, axis=1)

data_max = np.max(data_shift, axis=1)  # max, mean, std...

对我来说,for i in range(n) 看起来非常像一个迭代。在我的情况下,n 将非常大,例如 96kHz 的两秒音频样本,因此 n > 150000。无论如何,感谢您的贡献并欢迎来到 StackOverflow :-) - Alfe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接