有没有一种使用numpy来去除循环的方法?

3

我有一个三维numpy数组input_data(q x m x n),我正在使用它来构建直方图数据,最终会存储在plot_data(m x n x 2)中以便绘制。这一步是我的流程中不错的瓶颈,我想知道是否有更快、更“numpy”的方法来完成。

num_bins = 3
for i in range(m):

    for j in range(n):

        data = input_data[:, i, j]

        hist, bins = np.histogram(data, bins=num_bins)

        # Create the (x, y) pairs to plot
        plot_data[i][j] = np.stack((bins[:-1], hist), axis=1)

你的“更快”是指更简洁吗? - Luke Kot-Zaniewski
1
更快意味着运行时间更短,因此理想情况下应该利用numpy的向量化能力,类似于使用np.sum()来计算总和,而不是通过循环手动计算。 - holtc
所以,我查找了一些信息。也许,scipy网站可能是你在寻找的?我找到了下面的链接:https://docs.scipy.org/doc/numpy-1.13.0/reference/arrays.nditer.html - Maderas
q,m,n可以是任何值,但为了近似我的数据,可能会使用100000 x 10 x 360。@Maderas,很遗憾对于多维数组来说它并不那么有效。 - holtc
@Maderas; nditer 在这里没有帮助。它只是另一个迭代工具,仍然需要一个 Python 级别的循环。 - hpaulj
显示剩余3条评论
2个回答

1
这里提供了一种适用于任意数量箱子的向量化方法 -
def vectorized_app(input_data, num_bins):
    s0 = input_data.min(0)
    s1 = input_data.max(0)

    m,n,r = input_data.shape
    ids = (num_bins*((input_data - s0)/(s1-s0))).astype(int).clip(max=num_bins-1)
    offset = num_bins*(r*np.arange(n)[:,None] + np.arange(r))
    ids3D = ids + offset
    count3D = np.bincount(ids3D.ravel(), minlength=n*r*num_bins).reshape(n,r,-1)
    bins3D = create_ranges_nd(s0, s1, num_bins+1)[...,:-1]

    out = np.empty((n,r,num_bins,2))
    out[...,0] = bins3D
    out[...,1] = count3D
    return out

辅助函数 -

# https://dev59.com/t6Xja4cB1Zd3GeqPX-AT#46694364/ @Divakar
def create_ranges_nd(start, stop, N, endpoint=True):
    if endpoint==1:
        divisor = N-1
    else:
        divisor = N
    steps = (1.0/divisor) * (stop - start)
    return start[...,None] + steps[...,None]*np.arange(N)

运行时测试

原始方法 -

def org_app(input_data, num_bins):
    q,m,n = input_data.shape
    plot_data = np.zeros((m,n,num_bins,2))
    for i in range(m):
        for j in range(n):
            data = input_data[:, i, j]
            hist, bins = np.histogram(data, bins=num_bins)
            plot_data[i][j] = np.stack((bins[:-1], hist), axis=1)
    return plot_data

时间和验证 -

让我们在一个形状为(100, 100, 100)的大数据数组上进行测试,并将bin的数量设置为10

In [967]: # Setup input
     ...: num_bins = 10
     ...: m = 100
     ...: n = 100
     ...: q = 100
     ...: input_data = np.random.rand(q,m,n)
     ...: 
     ...: out1 = org_app(input_data, num_bins)
     ...: out2 = vectorized_app(input_data, num_bins)
     ...: print np.allclose(out1, out2)
     ...: 
True

In [968]: %timeit org_app(input_data, num_bins)
1 loop, best of 3: 748 ms per loop

In [969]: %timeit vectorized_app(input_data, num_bins)
100 loops, best of 3: 12.7 ms per loop

In [970]: 748/12.7 # speedup with vectorized one over original
Out[970]: 58.89763779527559

这种方法与使用for循环遍历大型3D数组相比,性能如何? - holtc
@holtc 添加,假设随机数的形状为(100,100,100),箱子数量为10。 - Divakar
所以当我在真实数据上运行时,np.allclose()为false,并且由于我正在jit所有方法,所以它实际上更慢,所以不幸的是我将无法使用它。在我的情况下,input_data的数量级将达到(100000, 8, 360),最多有100个bins。 - holtc
@holtc 是的,你的循环计数只有8 * 360。这不算多。保持你的循环/抖动-抖动。 - Divakar
@holtc 如果您能在未来的问题中事先提供这些大小/形状细节,那将会很有帮助。 - Divakar

0

我认为你的样本相似,因此直方图也相似。在这种情况下,您可以简化比较并以更向量化的方式进行:

a=np.random.rand(100000,10,10)


def f():  # roughly your approach.
    plotdata=np.zeros((10,10,3),np.int32)
    for i in range(10):
        for j in range(10):
            bins,hist=np.histogram(a[:,i,j],3)
            plotdata[i,j]=bins 
    return plotdata

def g(): #vectored comparisons 
    u=(a < 1/3).sum(axis=0)
    w=(a > 2/3).sum(axis=0)
    v=len(a)-u-w
    return np.dstack((u,v,w))

如果要实现8倍的改进:

In [213]: %timeit f()
548 ms ± 15.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [214]: %timeit g()
77.7 ms ± 5.46 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

很遗憾,我们不能这样简化,我们需要真实的数据。 - holtc

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接