Python - 将滑动窗口向量化

6

我正在尝试向量化一个滑动窗口操作。对于一维情况,一个有用的例子可能是:

x= vstack((np.array([range(10)]),np.array([range(10)])))

x[1,:]=np.where((x[0,:]<5)&(x[0,:]>0),x[1,x[0,:]+1],x[1,:])

对于每个索引小于5的当前值,计算n + 1的值。但是我收到了以下错误:
x[1,:]=np.where((x[0,:]<2)&(x[0,:]>0),x[1,x[0,:]+1],x[1,:])
IndexError: index (10) out of range (0<=index<9) in dimension 1

奇怪的是,对于小于0的索引值n-1,我不会得到这个错误。它似乎并不在意:

x[1,:]=np.where((x[0,:]<5)&(x[0,:]>0),x[1,x[0,:]-1],x[1,:])

print(x)

[[0 1 2 3 4 5 6 7 8 9]
 [0 0 1 2 3 5 6 7 8 9]]

有没有什么办法可以解决这个问题?我的方法完全错误吗?任何意见都将不胜感激。
编辑:我想要实现的是,将矩阵压平成一个numpy数组,然后计算每个单元格的6x6邻域的均值。
matriz = np.array([[1,2,3,4,5],
   [6,5,4,3,2],
   [1,1,2,2,3],
   [3,3,2,2,1],
   [3,2,1,3,2],
   [1,2,3,1,2]])

# matrix to vector
vector2 = ndarray.flatten(matriz)

ncols = int(shape(matriz)[1])
nrows = int(shape(matriz)[0])

vector = np.zeros(nrows*ncols,dtype='float64')


# Interior pixels
if ( (i % ncols) != 0 and (i+1) % ncols != 0 and i>ncols and i<ncols*(nrows-1)):

    vector[i] = np.mean(np.array([vector2[i-ncols-1],vector2[i-ncols],vector2[i-ncols+1],vector2[i-1],vector2[i+1],vector2[i+ncols-1],vector2[i+ncols],vector2[i+ncols+1]]))

请问您是不是不想在平均值中包含 vector2[i],还是这段代码中有错误? - Daniel
你的代码计算每个单元格的3x3邻域的平均值,而不是6x6邻域;这是有意为之吗? - nneonneo
4个回答

8

如果我理解问题正确,您想要计算所有数字的平均值,这些数字在索引周围 1 步,不包括该索引。

我已修复了您的函数以使其工作,我认为您想要类似于以下内容:

def original(matriz):

    vector2 = np.ndarray.flatten(matriz)

    nrows, ncols= matriz.shape
    vector = np.zeros(nrows*ncols,dtype='float64')

    # Interior pixels
    for i in range(vector.shape[0]):
        if ( (i % ncols) != 0 and (i+1) % ncols != 0 and i>ncols and i<ncols*(nrows-1)):

            vector[i] = np.mean(np.array([vector2[i-ncols-1],vector2[i-ncols],\
                        vector2[i-ncols+1],vector2[i-1],vector2[i+1],\
                        vector2[i+ncols-1],vector2[i+ncols],vector2[i+ncols+1]]))

我使用切片和视图重写了这个代码:

def mean_around(arr):
    arr=arr.astype(np.float64)

    out= np.copy(arr[:-2,:-2])  #Top left corner
    out+= arr[:-2,2:]           #Top right corner
    out+= arr[:-2,1:-1]         #Top center
    out+= arr[2:,:-2]           #etc
    out+= arr[2:,2:]
    out+= arr[2:,1:-1]
    out+= arr[1:-1,2:]
    out+= arr[1:-1,:-2]

    out/=8.0    #Divide by # of elements to obtain mean

    cout=np.empty_like(arr)  #Create output array
    cout[1:-1,1:-1]=out      #Fill with out values
    cout[0,:]=0;cout[-1,:]=0;cout[:,0]=0;cout[:,-1]=0 #Set edges equal to zero

    return  cout

使用np.empty_like然后填充边缘似乎比np.zeros_like稍微快一些。首先,让我们使用您的matriz数组再次检查它们是否给出相同的结果。

print np.allclose(mean_around(matriz),original(matriz))
True

print mean_around(matriz)
[[ 0.     0.     0.     0.     0.   ]
 [ 0.     2.5    2.75   3.125  0.   ]
 [ 0.     3.25   2.75   2.375  0.   ]
 [ 0.     1.875  2.     2.     0.   ]
 [ 0.     2.25   2.25   1.75   0.   ]
 [ 0.     0.     0.     0.     0.   ]]

一些时间记录:
a=np.random.rand(500,500)

print np.allclose(original(a),mean_around(a))
True

%timeit mean_around(a)
100 loops, best of 3: 4.4 ms per loop

%timeit original(a)
1 loops, best of 3: 6.6 s per loop

大约提速1500倍。

看起来使用numba是一个不错的选择:

def mean_numba(arr):
    out=np.zeros_like(arr)
    col,rows=arr.shape

    for x in xrange(1,col-1):
        for y in xrange(1,rows-1):
            out[x,y]=(arr[x-1,y+1]+arr[x-1,y]+arr[x-1,y-1]+arr[x,y+1]+\
                      arr[x,y-1]+arr[x+1,y+1]+arr[x+1,y]+arr[x+1,y-1])/8.
    return out

nmean= autojit(mean_numba)

现在我们来对比一下所有提出的方法。
a=np.random.rand(5000,5000)

%timeit mean_around(a)
1 loops, best of 3: 729 ms per loop

%timeit nmean(a)
10 loops, best of 3: 169 ms per loop

#CT Zhu's answer
%timeit it_mean(a)
1 loops, best of 3: 36.7 s per loop

#Ali_m's answer
%timeit fast_local_mean(a,(3,3))
1 loops, best of 3: 4.7 s per loop

#lmjohns3's answer
%timeit scipy_conv(a)
1 loops, best of 3: 3.72 s per loop

使用Numba加速4倍速度是相当正常的,这表明NumPy代码已经达到了最佳状态。我按照原样保留了其他代码,虽然我不得不修改@CTZhu的答案以包含不同的数组大小。

1
不错。对于 n=3,它比我的版本快了两倍,尽管它是为那个特定情况进行高度调整的 ;)。 - nneonneo
我非常喜欢这个。我现在正在度假,但我会尝试在我的特定问题上使用它,并回复您。我想用它来处理一个5000*5000的矩阵,看看它的表现如何。 - JEquihua
1
@nneonneo 在这篇文章的第一个迭代中,实际上我就使用了uniform_filter作为答案。我很高兴你在几个问题之前提出了它,它非常强大且速度惊人。 - Daniel
这非常酷。我有一个快速的问题。我如何更好地理解即时编译工作原理?我尝试用np.mean()替换公式sum()/n,但速度变得非常慢。为什么会这样呢?问题是我也想要计算中位数。所以我认为代码表达的方式是否有助于编译?我非常新手,所以指向一些好的入门阅读材料会很棒。我不知道任何C语言,我在R中有很多经验,刚开始学习Python,因此对编译或相关主题一无所知。 - JEquihua
@JEquihua 写好的 Numba 代码与写好的 NumPy 代码本质上是相反的。在 NumPy 中,利用内置函数通常是最好的选择;然而,在这里,将所有内容明确地写出来可以让编译器更好地优化您的代码。我认为 np.mean 的问题在于 NumPy 函数正在构建一个 NumPy 数组(将所有数据移动到连续的内存空间),然后取平均值。然而,上面的代码没有复制任何数据。我建议阅读(尽管不太详细)Numba 手册(http://numba.pydata.org)。 - Daniel
显示剩余2条评论

4

看起来你正在尝试计算2D卷积。如果你能使用scipy,我建议尝试使用scipy.signal.convolve2d

matriz = np.random.randn(10, 10)

# to average a 3x3 neighborhood
kernel = np.ones((3, 3), float)

# to compute the mean, divide by size of neighborhood
kernel /= kernel.sum()

average = scipy.signal.convolve2d(matriz, kernel)

如果您把convolve2d展开成其组成的循环,就可以看到它计算所有3x3邻域的平均值的原因。在忽略源和内核数组边缘发生的情况下,它实际上是计算:

X, Y = kernel.shape
for i in range(matriz.shape[0]):
    for j in range(matriz.shape[1]):
        for ii in range(X):
            for jj in range(Y):
                average[i, j] += kernel[ii, jj] * matriz[i+ii, j+jj]

所以如果您的内核中每个值都是1/(1+1+1+1+1+1+1+1+1)==1/9,您可以将上面的代码重写为:
for i in range(matriz.shape[0]):
    for j in range(matriz.shape[1]):
        average[i, j] = 1./9 * matriz[i:i+X, j:j+Y].sum()

这与计算在矩阵上以为起点的3x3区域内值的平均值完全相同。
这种方式的一个优点是,您可以通过适当设置内核中的值来轻松更改邻域相关的权重。因此,例如,如果您想使每个邻域中心值比其他值具有两倍的权重,则可以像这样构建内核:
kernel = np.ones((3, 3), float)
kernel[1, 1] = 2.
kernel /= kernel.sum()

卷积编码不变,但计算将产生不同类型的平均值(一种“中心加权”平均值)。这里有很多可能性;希望这为您正在进行的任务提供了一个良好的抽象。


3

Scipy标准库中刚好有一个函数可以非常快速地计算滑动窗口的均值。它被称为uniform_filter。您可以使用它来实现您的邻域均值函数,如下所示:

from scipy.ndimage.filters import uniform_filter
def neighbourhood_average(arr, win=3):
    sums = uniform_filter(arr, win, mode='constant') * (win*win)
    return ((sums - arr) / (win*win - 1))

这将返回一个数组X,其中X[i,j]arr中所有邻居的平均值,不包括i,j本身。请注意,第一列和最后一列以及第一行和最后一行受边界条件限制,因此可能对您的应用无效(如果需要,您可以使用mode=来控制边界规则)。
由于uniform_filter使用高度有效的线性时间算法,在直线C中实现(仅在arr的大小方面是线性的),因此它应该比任何其他解决方案表现更好,特别是当win很大时。

非常有趣。边界条件受什么限制?我想我需要通常的条件,但我没有在我的问题中发布。这如何排除(i,j)本身?您介意解释一下代码吗? - JEquihua
uniform_filter 默认情况下将窗口居中于每个 (i,j),因此它对例如 3x3 窗口 (i-1:i+2,j-1:j+2) 进行平均。对于位于原始数组之外的值,uniform_filter 使用由 mode 确定的填充值。如果您不关心不完整的窗口,可以删除或将第一行和最后一行以及第一列和最后一列置零。 - nneonneo
1
它排除了(i,j),因为- arr部分将原始值从窗口总和中移除。 - nneonneo

2
问题出在x [1,x [0,:] + 1]上,即第二个轴的索引:x [0,:]+1[1 2 3 4 5 6 7 8 9 10],其中索引10大于x的维度。
x [1,x [0,:]-1]的情况下,第二个轴的索引是[-1 0 1 2 3 4 5 6 7 8 9],你最终得到[9 0 1 2 3 4 5 6 7 8],因为9是最后一个元素,其索引为-1。倒数第二个元素的索引为-2,以此类推。
使用np.where((x [0,:]<5)&(x [0,:]>0),x [1,x [0,:]-1],x [1,:])x [0,:]=[0 1 2 3 4 5 6 7 8 9],实际上发生的是第一个单元格来自x [1,:],因为x [0,0]是0,x [0,:]<5)&(x [0,:]>0False。接下来的四个元素来自x [1,x [0,:]-1]。其余的来自x [1,:]。最后结果是[0 0 1 2 3 4 5 6 7 8]
这可能看起来对于仅有一个单元格的滑动窗口来说还可以,但它会给你带来惊喜。
>>> np.where((x[0,:]<5)&(x[0,:]>0),x[1,x[0,:]-2],x[1,:])
array([0, 9, 0, 1, 2, 5, 6, 7, 8, 9])

当您试图将其移动两个单元格的窗口时。

针对这个具体问题,如果我们希望将所有内容放在一行中,可以使用以下方法:

>>> for i in [1, 2, 3, 4, 5, 6]:
    print hstack((np.where(x[1,x[0,:]-i]<x[0, -i], x[1,x[0,:]-i], 0)[:5], x[0,5:]))

[0 0 1 2 3 5 6 7 8 9]
[0 0 0 1 2 5 6 7 8 9]
[0 0 0 0 1 5 6 7 8 9]
[0 0 0 0 0 5 6 7 8 9]
[0 0 0 0 0 5 6 7 8 9]
[0 0 0 0 0 5 6 7 8 9]

编辑: 现在我更好地理解了您的问题,基本上您想要获取一个二维数组,并计算每个单元格周围N*N个单元格的平均值。这是很常见的。首先,您可能希望将N限制为奇数,否则2*2的平均值就难以定义。假设我们想要3*3的平均值:

#In this example, the shape is (10,10)
>>> a1=\
array([[3, 7, 0, 9, 0, 8, 1, 4, 3, 3],
   [5, 6, 5, 2, 9, 2, 3, 5, 2, 9],
   [0, 9, 8, 5, 3, 1, 8, 1, 9, 4],
   [7, 4, 0, 0, 9, 3, 3, 3, 5, 4],
   [3, 1, 2, 4, 8, 8, 2, 1, 9, 6],
   [0, 0, 3, 9, 3, 0, 9, 1, 3, 3],
   [1, 2, 7, 4, 6, 6, 2, 6, 2, 1],
   [3, 9, 8, 5, 0, 3, 1, 4, 0, 5],
   [0, 3, 1, 4, 9, 9, 7, 5, 4, 5],
   [4, 3, 8, 7, 8, 6, 8, 1, 1, 8]])
#move your original array 'a1' around, use range(-2,2) for 5*5 average and so on
>>> movea1=[a1[np.clip(np.arange(10)+i, 0, 9)][:,np.clip(np.arange(10)+j, 0, 9)] for i, j in itertools.product(*[range(-1,2),]*2)]
#then just take the average
>>> averagea1=np.mean(np.array(movea1), axis=0)
#trim the result array, because the cells among the edges do not have 3*3 average
>>> averagea1[1:10-1, 1:10-1]
array([[ 4.77777778,  5.66666667,  4.55555556,  4.33333333,  3.88888889,
     3.66666667,  4.        ,  4.44444444],
   [ 4.88888889,  4.33333333,  4.55555556,  3.77777778,  4.55555556,
     3.22222222,  4.33333333,  4.66666667],
   [ 3.77777778,  3.66666667,  4.33333333,  4.55555556,  5.        ,
     3.33333333,  4.55555556,  4.66666667],
   [ 2.22222222,  2.55555556,  4.22222222,  4.88888889,  5.        ,
     3.33333333,  4.        ,  3.88888889],
   [ 2.11111111,  3.55555556,  5.11111111,  5.33333333,  4.88888889,
     3.88888889,  3.88888889,  3.55555556],
   [ 3.66666667,  5.22222222,  5.        ,  4.        ,  3.33333333,
     3.55555556,  3.11111111,  2.77777778],
   [ 3.77777778,  4.77777778,  4.88888889,  5.11111111,  4.77777778,
     4.77777778,  3.44444444,  3.55555556],
   [ 4.33333333,  5.33333333,  5.55555556,  5.66666667,  5.66666667,
     4.88888889,  3.44444444,  3.66666667]])

我认为您不需要将2D数组展平,这会导致混淆。此外,如果您想以不同于仅修剪它们的方式处理边缘元素,请考虑在“移动原始数组”步骤中使用np.ma创建掩码数组。


为什么不能反过来工作,10再次成为第一个元素?或者我该怎么做才能实现我的想法呢? - JEquihua
哦,与Matlab不同,Python的索引从0开始。因此,如果您使用正整数,则长度为10的向量的最大索引为9,如果尝试x [10],则会出现“indexError”。对于x = [0 1 2 3 4 5 6 7 8 9],要获取9,可以使用x [-1]x [9],但不能使用x [10] - CT Zhu
我将编辑我的问题以展示我真正想要实现的内容。我只是不想让问题变得太长,但是还是要说一下。因为我觉得你有点误解我的意思。 - JEquihua
这个会快吗?我想将代码向量化,因为我正在处理大型数据集(5000 x 5000)。 - JEquihua
@lmjohns3的回答是最好和最快的。convolve2d()由扩展程序\site-packages\scipy\signal\sigtools.pyd提供,它以c速度运行,并且不会消耗太多内存。 - CT Zhu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接