NumPy数组的子维度上的Python操作

4
许多numpy函数提供了在特定轴上操作的选项,使用axis=参数。我的问题是:
  1. 这些'along axis'操作是如何实现的?或者更直接的问题
  2. 我如何高效地编写自己的函数,提供类似的选项?
我注意到numpy提供了一个函数numpy.apply_along_axis,如果基础函数的输入是1-D数组,则可以作为答案。
但是,如果我的基础函数需要多维输入呢?例如,要在np矩阵A(形状为(5,6,2,3,4))的前两个维度(5,6)上找到2-D移动平均值B。像一个通用函数B = f_moving_mean(A,axis=(0,1))。
我目前的解决方案是使用numpy.swapaxes和numpy.reshape来实现。一个1-D移动平均函数的示例代码如下:
import pandas as pd
import numpy as np
def nanmoving_mean(data,window,axis=0):
    kw = {'center':True,'window':window,'min_periods':1}
    if len(data.shape)==1:
        return pd.Series(data).rolling(**kw).mean().as_matrix()
    elif len(data.shape)>=2:
        tmp = np.swapaxes(data,0,axis)
        tmpshp = tmp.shape
        tmp = np.reshape( tmp, (tmpshp[0],-1), order='C' )
        tmp = pd.DataFrame(tmp).rolling(**kw).mean().as_matrix()
        tmp = np.reshape( tmp, tmpshp, order='C' )
        return np.swapaxes(tmp,0,axis)
    else:
        print('Invalid dimension!')
        return None

data = np.random.randint(10,size=(2,3,6))
print(data)
nanmoving_mean(data,window=3,axis=2)

这是实现问题2的常见/高效方法吗?欢迎任何改进/建议/新方法。 PS. 我之所以涉及pandas,是因为它的rolling(...).mean()方法能够正确处理nan数据。 编辑:我想另一种问问题的方式可能是:循环遍历“动态”数量的维度的语法是什么?
2个回答

1

不深入探讨您的问题,这里是 apply_along_axis 函数的关键部分(通过 Ipython 查看)

        res = func1d(arr[tuple(i.tolist())], *args, **kwargs)
        outarr[tuple(ind)] = res

他们构建了两个索引对象,iind,它们是可变的。假设我们指定 axis=2,那么这段代码会执行以下操作:
outarr[i,j,l] = func1d( arr[i,j,:,l], ...)

对于所有可能的ijl的值。因此,为了进行一个相当基本的迭代计算,需要大量的代码。

ind = [0]*(nd-1)   # ind is just a nd-1 list

i = zeros(nd, 'O')        # i is a 1d array with a `slice` object
i[axis] = slice(None, None)

我不熟悉pandas的rolling。但是有许多关于numpy滚动平均值的问题。scipy.signal.convolve2d可能会有用。np.lib.stride_tricks.as_strided也被使用。

您使用reshapeswapaxis(或transpose)来减少维度空间的复杂性的想法也很好。

(这不是一个解决方案;而是一些浮现在脑海中的想法,记得其他“移动平均”问题。现在开发已经太晚了。)


我不知道在哪里查看 apply_along_axis 的代码,但它是如何构建 i 和 ind 的? - Shichu Zhu
@ShichuZhu 如果你在寻求性能方面的提升,apply_along_axis 并不能帮助你。 - Divakar
@Divakar 如果基本函数只处理1D,则循环遍历所有其他维度是唯一的方法。除非修改基本函数以包括矢量化操作,我猜? - Shichu Zhu
@ShichuZhu 不需要循环。有许多矢量化选项可用于在窗口中求和元素。 - Divakar

1

我们可以使用2D卷积的方法。

基本步骤如下:

  • 作为预处理步骤,将NaNs替换为0s,因为我们需要对输入数据进行窗口求和。
  • 使用Scipy's convolve2d获取数据值和NaNs掩码的窗口求和。我们将使用边界元素作为零。
  • 从窗口大小中减去NaNs的窗口计数,以获取负责求和的有效元素计数。
  • 对于边界元素,我们将逐渐减少计入求和的元素。
现在,这些“间隔求和”也可以通过Scipy的1Duniform-filter获得,这比较高效。另一个好处是我们可以指定沿着哪个轴执行这些求和/平均操作。
通过混合使用Scipy的2D卷积1D均匀滤波器,我们可以采用以下几种方法。
导入相关的Scipy函数 -
from scipy.signal import convolve2d as conv2
from scipy.ndimage.filters import uniform_filter1d as uniff

方法1:
def nanmoving_mean_numpy(data, W): # data: input array, W: Window size
    N = data.shape[-1]
    hW = (W-1)//2

    nan_mask = np.isnan(data)
    data1 = np.where(nan_mask,0,data)

    value_sums = conv2(data1.reshape(-1,N),np.ones((1,W)),'same', boundary='fill')
    nan_sums = conv2(nan_mask.reshape(-1,N),np.ones((1,W)),'same', boundary='fill')

    value_sums.shape = data.shape
    nan_sums.shape = data.shape

    b_sizes = hW+1+np.arange(hW) # Boundary sizes
    count = np.hstack(( b_sizes , W*np.ones(N-2*hW), b_sizes[::-1] ))
    return value_sums/(count - nan_sums)

方法二:

def nanmoving_mean_numpy_v2(data, W): # data: input array, W: Window size    
    N = data.shape[-1]
    hW = (W-1)//2

    nan_mask = np.isnan(data)
    data1 = np.where(nan_mask,0,data)

    value_sums = uniff(data1,size=W, axis=-1, mode='constant')*W
    nan_sums = conv2(nan_mask.reshape(-1,N),np.ones((1,W)),'same', boundary='fill')
    nan_sums.shape = data.shape

    b_sizes = hW+1+np.arange(hW) # Boundary sizes
    count = np.hstack(( b_sizes , W*np.ones(N-2*hW,dtype=int), b_sizes[::-1] ))
    out =  value_sums/(count - nan_sums)
    out = np.where(np.isclose( count, nan_sums), np.nan, out)
    return out

方法三:
def nanmoving_mean_numpy_v3(data, W): # data: input array, W: Window size
    N = data.shape[-1]
    hW = (W-1)//2

    nan_mask = np.isnan(data)
    data1 = np.where(nan_mask,0,data)    
    nan_avgs = uniff(nan_mask.astype(float),size=W, axis=-1, mode='constant')

    b_sizes = hW+1+np.arange(hW) # Boundary sizes
    count = np.hstack(( b_sizes , W*np.ones(N-2*hW), b_sizes[::-1] ))
    scale = ((count/float(W)) - nan_avgs)
    out = uniff(data1,size=W, axis=-1, mode='constant')/scale
    out = np.where(np.isclose( scale, 0), np.nan, out)
    return out

运行时测试

数据集 #1 :

In [807]: # Create random input array and insert NaNs
     ...: data = np.random.randint(10,size=(20,30,60)).astype(float)
     ...: 
     ...: # Add 10% NaNs across the data randomly
     ...: idx = np.random.choice(data.size,size=int(data.size*0.1),replace=0)
     ...: data.ravel()[idx] = np.nan
     ...: 
     ...: W = 5 # Window size
     ...: 

In [808]: %timeit nanmoving_mean(data,window=W,axis=2)
     ...: %timeit nanmoving_mean_numpy(data, W)
     ...: %timeit nanmoving_mean_numpy_v2(data, W)
     ...: %timeit nanmoving_mean_numpy_v3(data, W)
     ...: 
10 loops, best of 3: 22.3 ms per loop
100 loops, best of 3: 3.31 ms per loop
100 loops, best of 3: 2.99 ms per loop
1000 loops, best of 3: 1.76 ms per loop

数据集#2 [更大的数据集]:
In [811]: # Create random input array and insert NaNs
 ...: data = np.random.randint(10,size=(120,130,160)).astype(float)
 ...: 
 ...: # Add 10% NaNs across the data randomly
 ...: idx = np.random.choice(data.size,size=int(data.size*0.1),replace=0)
 ...: data.ravel()[idx] = np.nan
 ...: 

In [812]: %timeit nanmoving_mean(data,window=W,axis=2)
     ...: %timeit nanmoving_mean_numpy(data, W)
     ...: %timeit nanmoving_mean_numpy_v2(data, W)
     ...: %timeit nanmoving_mean_numpy_v3(data, W)
     ...: 
1 loops, best of 3: 796 ms per loop
1 loops, best of 3: 486 ms per loop
1 loops, best of 3: 275 ms per loop
10 loops, best of 3: 161 ms per loop

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接