在numpy中进行二维跨步卷积

16

我尝试使用for循环实现2D数组的步幅卷积,即

arr = np.array([[2,3,7,4,6,2,9],
                [6,6,9,8,7,4,3],
                [3,4,8,3,8,9,7],
                [7,8,3,6,6,3,4],
                [4,2,1,8,3,4,6],
                [3,2,4,1,9,8,3],
                [0,1,3,9,2,1,4]])

arr2 = np.array([[3,4,4],
                 [1,0,2],
                 [-1,0,3]])

def stride_conv(arr1,arr2,s,p):
    beg = 0
    end = arr2.shape[0]
    final = []
    for i in range(0,arr1.shape[0]-1,s):
        k = []
        for j in range(0,arr1.shape[0]-1,s):
            k.append(np.sum(arr1[beg+i : end+i, beg+j:end+j] * (arr2)))
        final.append(k)

    return np.array(final)

stride_conv(arr,arr2,2,0)

这将导致一个3*3的数组:

array([[ 91, 100,  88],
       [ 69,  91, 117],
       [ 44,  72,  74]])
有没有numpy函数或scipy函数可以做相同的事情?我的方法不太好。我该如何向量化这个过程?

是否有numpy函数或scipy函数能够实现相同的功能?我的方法并不是很好,怎样进行向量化处理呢?


你不能使用scipy 2D卷积吗? - Divakar
2
@Divakar,我昨天刚学习了卷积,scipy 2D conv没有步长数量的参数,如果有的话,恐怕我需要更多的研究。 - Bharath M Shetty
1
参数 p 没有被使用? - Divakar
1
@Divakar,这还不完整,我想使用填充,但我只学会了不使用填充的方法。我只是保留了该参数以备将来更新。 - Bharath M Shetty
1
可能这篇博客文章可以帮助你。引用:“假设我们有一个大小为1x1x10x10的单个图像和一个大小为1x1x3x3的单个过滤器。...然后,天真地说,如果我们要对图像进行卷积操作,则会在图像上循环,并在每个位置上进行点积...”和“但是,如果我们不想执行循环怎么办?...我们需要收集可以应用过滤器的所有可能位置,然后进行单个矩阵乘法以获得每个可能位置的点积。” - dfrib
6个回答

12

忽略填充参数和尾部窗口不足以与第二个数组进行卷积的情况,下面是一种使用 np.lib.stride_tricks.as_strided 的方法 -

def strided4D(arr,arr2,s):
    strided = np.lib.stride_tricks.as_strided
    s0,s1 = arr.strides
    m1,n1 = arr.shape
    m2,n2 = arr2.shape    
    out_shp = (1+(m1-m2)//s, m2, 1+(n1-n2)//s, n2)
    return strided(arr, shape=out_shp, strides=(s*s0,s*s1,s0,s1))

def stride_conv_strided(arr,arr2,s):
    arr4D = strided4D(arr,arr2,s=s)
    return np.tensordot(arr4D, arr2, axes=((2,3),(0,1)))

或者,我们可以使用scikit-image内置的view_as_windows来优雅地获取这些窗口,如下所示-

from skimage.util.shape import view_as_windows

def strided4D_v2(arr,arr2,s):
    return view_as_windows(arr, arr2.shape, step=s)

1
Strided4D非常美丽,先生,您能简单解释一下您的方法吗? - Bharath M Shetty
先生,strided4D的out_shp是某种公式吗? - Bharath M Shetty
1
@Dark 是的,通常是 1+(m1-m2)。考虑到步长,我们需要进行除法运算。可以这样想,我们沿着 m1 的长度向后移动 m2,然后根据步长从起点开始计算有多少个窗口可以适配。 - Divakar
先生,我选择了NumPy,这样我就可以理解核心概念。一旦我完全理解了这个主题,我会使用scikit。 - Bharath M Shetty
4
但是你的代码只适用于形状为 arr=7x7(这将导致 1+(m1-m2)//s = 3 = kernel size),有时候 arr4D 中甚至会有 nan 值。更改为 (1+(m1-m2)//s, 1+(n1-n2)//s, m2, n2) 可以得到正确的 tensordot 轴匹配 ((2,3),(0,1))。我认为 @NaN 也是想问这个问题,因为他的 np.sum() 方法现在也可以工作了。 - Jason
显示剩余9条评论

8
我认为我们可以进行“有效”的FFT卷积,并仅选择跨步位置上的结果,如下所示:
def strideConv(arr,arr2,s):
    cc=scipy.signal.fftconvolve(arr,arr2[::-1,::-1],mode='valid')
    idx=(np.arange(0,cc.shape[1],s), np.arange(0,cc.shape[0],s))
    xidx,yidx=np.meshgrid(*idx)
    return cc[yidx,xidx]

这与其他人的答案给出了相同的结果。但我猜只有当内核大小为奇数时才有效。
另外,我在arr2[::-1, ::-1]中翻转了内核,只是为了与其他人保持一致,根据上下文,您可能想省略它。
更新:
目前,我们有几种使用numpy和scipy进行2D或3D卷积的不同方法,我考虑进行一些比较,以便了解哪种方法在不同大小的数据上更快。希望这不会被视为离题。
方法1:FFT卷积(使用scipy.signal.fftconvolve):
def padArray(var,pad,method=1):
    if method==1:
        var_pad=numpy.zeros(tuple(2*pad+numpy.array(var.shape[:2]))+var.shape[2:])
        var_pad[pad:-pad,pad:-pad]=var
    else:
        var_pad=numpy.pad(var,([pad,pad],[pad,pad])+([0,0],)*(numpy.ndim(var)-2),
                mode='constant',constant_values=0)
    return var_pad

def conv3D(var,kernel,stride=1,pad=0,pad_method=1):
    '''3D convolution using scipy.signal.convolve.
    '''
    var_ndim=numpy.ndim(var)
    kernel_ndim=numpy.ndim(kernel)
    stride=int(stride)

    if var_ndim<2 or var_ndim>3 or kernel_ndim<2 or kernel_ndim>3:
        raise Exception("<var> and <kernel> dimension should be in 2 or 3.")

    if var_ndim==2 and kernel_ndim==3:
        raise Exception("<kernel> dimension > <var>.")

    if var_ndim==3 and kernel_ndim==2:
        kernel=numpy.repeat(kernel[:,:,None],var.shape[2],axis=2)

    if pad>0:
        var_pad=padArray(var,pad,pad_method)
    else:
        var_pad=var

    conv=fftconvolve(var_pad,kernel,mode='valid')

    if stride>1:
        conv=conv[::stride,::stride,...]

    return conv

方法2:特殊转换(参见此答案):
def conv3D2(var,kernel,stride=1,pad=0):
    '''3D convolution by sub-matrix summing.
    '''
    var_ndim=numpy.ndim(var)
    ny,nx=var.shape[:2]
    ky,kx=kernel.shape[:2]

    result=0

    if pad>0:
        var_pad=padArray(var,pad,1)
    else:
        var_pad=var

    for ii in range(ky*kx):
        yi,xi=divmod(ii,kx)
        slabii=var_pad[yi:2*pad+ny-ky+yi+1:1, xi:2*pad+nx-kx+xi+1:1,...]*kernel[yi,xi]
        if var_ndim==3:
            slabii=slabii.sum(axis=-1)
        result+=slabii

    if stride>1:
        result=result[::stride,::stride,...]

    return result

第三种方法:Divakar建议的步幅视图卷积。
def asStride(arr,sub_shape,stride):
    '''Get a strided sub-matrices view of an ndarray.

    <arr>: ndarray of rank 2.
    <sub_shape>: tuple of length 2, window size: (ny, nx).
    <stride>: int, stride of windows.

    Return <subs>: strided window view.

    See also skimage.util.shape.view_as_windows()
    '''
    s0,s1=arr.strides[:2]
    m1,n1=arr.shape[:2]
    m2,n2=sub_shape[:2]

    view_shape=(1+(m1-m2)//stride,1+(n1-n2)//stride,m2,n2)+arr.shape[2:]
    strides=(stride*s0,stride*s1,s0,s1)+arr.strides[2:]
    subs=numpy.lib.stride_tricks.as_strided(arr,view_shape,strides=strides)

    return subs

def conv3D3(var,kernel,stride=1,pad=0):
    '''3D convolution by strided view.
    '''
    var_ndim=numpy.ndim(var)
    kernel_ndim=numpy.ndim(kernel)

    if var_ndim<2 or var_ndim>3 or kernel_ndim<2 or kernel_ndim>3:
        raise Exception("<var> and <kernel> dimension should be in 2 or 3.")

    if var_ndim==2 and kernel_ndim==3:
        raise Exception("<kernel> dimension > <var>.")

    if var_ndim==3 and kernel_ndim==2:
        kernel=numpy.repeat(kernel[:,:,None],var.shape[2],axis=2)

    if pad>0:
        var_pad=padArray(var,pad,1)
    else:
        var_pad=var

    view=asStride(var_pad,kernel.shape,stride)
    #return numpy.tensordot(aa,kernel,axes=((2,3),(0,1)))
    if numpy.ndim(kernel)==2:
        conv=numpy.sum(view*kernel,axis=(2,3))
    else:
        conv=numpy.sum(view*kernel,axis=(2,3,4))

    return conv

我进行了3组比较:

  1. 对2D数据进行卷积,使用不同的输入大小和卷积核大小,步长为1,填充为0。结果如下(颜色表示卷积重复10次所用的时间):

enter image description here

"FFT conv"通常是最快的。当卷积核大小增加时,“Special conv”和“Stride-view conv”变慢,但随着接近输入数据的大小,又会减少。最后一个子图显示了最快的方法,因此紫色的大三角形表示FFT是赢家,但请注意左侧有一条细绿色柱(可能太小看不清楚,但确实存在),这表明“Special conv”对于非常小的内核(小于约5x5)具有优势。而当内核大小接近输入时,“stride-view conv”是最快的(见对角线)。
比较2:对3D数据进行卷积。
设置:pad=0,stride=2,输入维度=nxnx5,内核形状=fxfx5
当内核大小位于输入的中间时,我跳过了“Special Conv”和“Stride-view conv”的计算。基本上,“Special Conv”现在没有优势,“Stride-view”比FFT对于小和大内核都更快。

enter image description here

补充说明:当尺寸超过350时,我注意到“Stride-view conv”会出现相当大的内存使用峰值。

比较3:使用更大步长对3D数据进行卷积。

设置:填充=0,步长=5,输入维度=nxnx10,内核形状=fxfx10

这次我省略了“特殊卷积”。对于更大的区域,“Stride-view conv”超越了FFT,最后一个子图显示差异接近100%。 可能是因为随着步幅增加,FFT方法将有更多的浪费数字,因此“stride-view”在小型和大型内核方面获得更多优势。

enter image description here


8

使用scipy中的signal.convolve2d怎么样?

我的方法类似于Jason的方法,但是使用索引。

def strideConv(arr, arr2, s):
    return signal.convolve2d(arr, arr2[::-1, ::-1], mode='valid')[::s, ::s]

请注意,内核必须被反转。有关详细信息,请参见此处此处的讨论。否则使用signal.correlate2d
示例:
 >>> strideConv(arr, arr2, 1)
 array([[ 91,  80, 100,  84,  88],
        [ 99, 106, 126,  92,  77],
        [ 69,  98,  91,  93, 117],
        [ 80,  79,  87,  93,  61],
        [ 44,  72,  72,  63,  74]])
 >>> strideConv(arr, arr2, 2)
 array([[ 91, 100,  88],
        [ 69,  91, 117],
        [ 44,  72,  74]])

6
有没有不浪费计算的方法?就像这个例子一样,在所需步幅之间进行了许多无用的计算。 - Recessive

4

这里是一种基于O(N^d (log N)^d) fft的方法。其思路是将两个操作数都分解成间隔为strides的网格,并在所有偏移量模除strides的位置上进行传统fft卷积,然后逐点求和结果。虽然需要用到一些索引,但我恐怕无法避免:

import numpy as np
from numpy.fft import fftn, ifftn

def strided_conv_2d(x, y, strides):
    s, t = strides
    # consensus dtype
    cdt = (x[0, 0, ...] + y[0, 0, ...]).dtype
    xi, xj = x.shape
    yi, yj = y.shape
    # round up modulo strides
    xk, xl, yk, yl = map(lambda a, b: -a//b * -b, (xi,xj,yi,yj), (s,t,s,t))
    # zero pad to avoid circular convolution
    xp, yp = (np.zeros((xk+yk, xl+yl), dtype=cdt) for i in range(2))
    xp[:xi, :xj] = x
    yp[:yi, :yj] = y
    # fold out strides
    xp = xp.reshape((xk+yk)//s, s, (xl+yl)//t, t)
    yp = yp.reshape((xk+yk)//s, s, (xl+yl)//t, t)
    # do conventional fft convolution
    xf = fftn(xp, axes=(0, 2))
    yf = fftn(yp, axes=(0, 2))
    result = ifftn(xf * yf.conj(), axes=(0, 2)).sum(axis=(1, 3))
    # restore dtype
    if cdt in (int, np.int_, np.int64, np.int32):
        result = result.real.round()
    return result.astype(cdt)

arr = np.array([[2,3,7,4,6,2,9],
                [6,6,9,8,7,4,3],
                [3,4,8,3,8,9,7],
                [7,8,3,6,6,3,4],
                [4,2,1,8,3,4,6],
                [3,2,4,1,9,8,3],
                [0,1,3,9,2,1,4]])

arr2 = np.array([[3,4,4],
                 [1,0,2],
                 [-1,0,3]])

print(strided_conv_2d(arr, arr2, (2, 2)))

结果:

[[ 91 100  88  23   0  29]
 [ 69  91 117  19   0  38]
 [ 44  72  74  17   0  22]
 [ 16  53  26  12   0   0]
 [  0   0   0   0   0   0]
 [ 19  11  21  -9   0   6]]

我不是一个很擅长阅读代码的人。最好有一个简短的解释。 - Bharath M Shetty
1
输出结果不同,为什么会这样? - Bharath M Shetty
@Dark这只是零填充输入。你可以在偏移量1,1处找到输出的子数组。我已经添加了一些通用说明。如果您需要更多细节,请告诉我。 - Paul Panzer
1
@Dark,我已经清理了代码并添加了一些注释。 - Paul Panzer

1
据我所知,numpy或scipy没有直接实现支持步长和填充的卷积滤波器,因此我认为最好使用诸如torch或tensorflow之类的DL包,然后将最终结果转换为numpy。一个torch的实现可能是:
import torch
import torch.nn.functional as F

arr = torch.tensor(np.expand_dims(arr, axis=(0,1))
arr2 = torch.tensor(np.expand_dims(arr2, axis=(0,1))
output = F.conv2d(arr, arr2, stride=2, padding=0)
output = output.numpy().squeeze()

output>
array([[ 91, 100,  88],
       [ 69,  91, 117],
       [ 44,  72,  74]])

0

卷积支持步幅和膨胀。使用numpy.lib.stride_tricks.as_strided

import numpy as np
from numpy.lib.stride_tricks import as_strided

def conv_view(X, F_s, dr, std):
    X_s = np.array(X.shape)
    F_s = np.array(F_s)
    dr = np.array(dr)
    Fd_s = (F_s - 1) * dr + 1
    if np.any(Fd_s > X_s):
        raise ValueError('(Dilated) filter size must be smaller than X')
    std = np.array(std)
    X_ss = np.array(X.strides)
    Xn_s = (X_s - Fd_s) // std + 1
    Xv_s = np.append(Xn_s, F_s)
    Xv_ss = np.tile(X_ss, 2) * np.append(std, dr)
    return as_strided(X, Xv_s, Xv_ss, writeable=False)

def convolve_stride(X, F, dr=None, std=None):
    if dr is None:
        dr = np.ones(X.ndim, dtype=int)
    if std is None:
        std = np.ones(X.ndim, dtype=int)
    if not (X.ndim == F.ndim == len(dr) == len(std)):
        raise ValueError('X.ndim, F.ndim, len(dr), len(std) must be the same')
    Xv = conv_view(X, F.shape, dr, std)
    return np.tensordot(Xv, F, axes=X.ndim)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接