Numpy:向量化np.argwhere

6

我在numpy中有以下数据结构:

import numpy as np

a = np.random.rand(267, 173) # dense img matrix
b = np.random.rand(199) # array of probability samples

我的目标是对于数组 b 中的每个元素 i,找到数组 a 中所有小于等于 i 的值所对应的x,y坐标/索引位置,并从该子集中随机选择一个值:

from random import randint

for i in b:
  l = np.argwhere(a <= i) # list of img coordinates where pixel <= i
  sample = l[randint(0, len(l)-1)] # random selection from `l`

这个“可行”,但我想向量化采样操作(即用apply_along_axis或类似方法替换for循环)。有没有人知道如何做到这一点?任何建议将不胜感激!


apply_along_axis 不是真正的向量化 - 它没有在编译后代码中实现循环。 - hpaulj
3个回答

6

由于每次都有随机子集大小,因此您无法完全矢量化np.argmax。不过,您可以通过排序大幅加快计算速度。对图像进行一次排序将创建一个单一分配,而在每个步骤中屏蔽图像将为掩码提取的元素创建一个临时数组。有了排序后的图像,您只需应用np.searchsorted即可获得大小:

a_sorted = np.sort(a.ravel())
indices = np.searchsorted(a_sorted, b, side='right')

您仍需要使用循环进行采样,但可以这样做:
samples = np.array([a_sorted[np.random.randint(i)] for i in indices])

使用此系统获取x-y坐标而非样本值有些复杂。您可以使用 np.unravel_index 来获取索引,但首先必须将其从 a_sorted 的参考框架转换为 a.ravel()。如果使用 np.argsort 而不是 np.sort 进行排序,则可以在原始数组中获取索引。幸运的是,np.searchsorted 支持带有 sorter 参数的这种情况:
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, side='right', sorter=a_ind)
r, c = np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)

rcb大小相同,基于b它们对应于a中每个选择的行和列索引。索引转换取决于数组中的步长,因此我们假设您使用C顺序,因为默认情况下90%的数组都会这样做。

复杂度

假设b大小为Ma大小为N

您当前的算法对a的每个元素进行线性搜索以查找b的每个元素。在每次迭代中,它分配用于匹配元素的掩码(平均为N/2),然后分配相同大小的缓冲区来保存掩码选择。这意味着时间复杂度为O(M * N),空间复杂度也是如此。

我的算法首先对a进行排序,时间复杂度为O(N log N)。然后搜索M个插入点,时间复杂度为O(M log N)。最后选择M个样本。它分配的空间包括一份已排序的图像副本和两个大小为M的数组。因此,时间复杂度为O((M + N) log N),空间复杂度为O(M + N)

啊,感谢@MadPhysicist!事实证明,我需要的是x,y坐标,而不是这些坐标上的值。我将图像视为概率分布,其中暗像素具有高概率被采样,而亮像素具有低概率被采样。我想从该分布中绘制n个样本,方法是创建一个具有n个条目的b,并对于b中的每个值i,选择一个油墨概率<= i的像素。因此,我想要在a中获取值的索引位置,而不是值本身。是否有一种方法可以改进上面的内容,以获取这些索引? - duhaime
@duhaime。当然。稍等一下。 - Mad Physicist
@duhaime。愉快编程! - Mad Physicist
1
太棒了!我注意到将 0 传递给 np.random.randint() 会导致一个错误,但是可以使用 np.random.randint(i) if i else 0。非常感谢您的帮助! - duhaime
1
@duhaime。更好的方法是DanielF的建议。根本不需要Python循环。 - Mad Physicist
显示剩余5条评论

4

以下是另一种方法,它将b进行argsorting,然后使用np.digitize相应地分割a,请参考此帖子

import numpy as np
from scipy import sparse
from timeit import timeit
import math

def h_digitize(a,bs,right=False):
    mx,mn = a.max(),a.min()
    asz = mx-mn
    bsz = bs[-1]-bs[0]
    nbins=int(bs.size*math.sqrt(bs.size)*asz/bsz)
    bbs = np.concatenate([[0],((nbins-1)*(bs-mn)/asz).astype(int).clip(0,nbins),[nbins]])
    bins = np.repeat(np.arange(bs.size+1), np.diff(bbs))
    bbs = bbs[:bbs.searchsorted(nbins)]
    bins[bbs] = -1
    aidx = bins[((nbins-1)*(a-mn)/asz).astype(int)]
    ambig = aidx == -1
    aa = a[ambig]
    if aa.size:
        aidx[ambig] = np.digitize(aa,bs,right)
    return aidx

def f_pp():
    bo = b.argsort()
    bs = b[bo]
    aidx = h_digitize(a,bs,right=True).ravel()
    aux = sparse.csr_matrix((aidx,aidx,np.arange(aidx.size+1)),
                            (aidx.size,b.size+1)).tocsc()
    ridx = np.empty(b.size,int)
    ridx[bo] = aux.indices[np.fromiter(map(np.random.randint,aux.indptr[1:-1].tolist()),int,b.size)]
    return np.unravel_index(ridx,a.shape)

def f_mp():
    a_ind = np.argsort(a, axis=None)
    indices = np.searchsorted(a.ravel(), b, sorter=a_ind, side='right')
    return np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)


a = np.random.rand(267, 173) # dense img matrix
b = np.random.rand(199) # array of probability samples

# round to test wether equality is handled correctly
a = np.round(a,3)
b = np.round(b,3)

print('pp',timeit(f_pp, number=1000),'ms')
print('mp',timeit(f_mp, number=1000),'ms')

# sanity checks

S = np.max([a[f_pp()] for _ in range(1000)],axis=0)
T = np.max([a[f_mp()] for _ in range(1000)],axis=0)
print(f"inequality satisfied: pp {(S<=b).all()} mp {(T<=b).all()}")
print(f"largest smalles distance to boundary: pp {(b-S).max()} mp {(b-T).max()}")
print(f"equality done right: pp {not (b-S).all()} mp {not (b-T).all()}")

使用修改过的 digitize 函数,我速度快了一些,但这可能会因问题规模而异。此外,@MadPhysicist 的解决方案要简单得多。使用标准的 digitize 函数我们大致相当。

pp 2.620121960993856 ms                                                                                                                                                                                                                                                        
mp 3.301037881989032 ms                                                                                                                                                                                                                                                        
inequality satisfied: pp True mp True
largest smalles distance to boundary: pp 0.0040000000000000036 mp 0.006000000000000005
equality done right: pp True mp True

哇,这是一次深入的探究!即使它会让我慢下来一点,我也要使用更易读的解决方案,只为确保我能在未来几年理解代码,但对于真正的狂热者来说,这太棒了! - duhaime

2

对@MadPhysicist算法的微小改进,使其更加向量化:

稍作修改,使之更具向量化特点:

%%timeit
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, sorter=a_ind)
r, c = np.unravel_index(a_ind[[np.random.randint(i) for i in indices]], a.shape)
100 loops, best of 3: 6.32 ms per loop

%%timeit
a_ind = np.argsort(a, axis=None)
indices = np.searchsorted(a.ravel(), b, sorter=a_ind)
r, c = np.unravel_index(a_ind[(np.random.rand(indices.size) * indices).astype(int)], a.shape)
100 loops, best of 3: 4.16 ms per loop

@PaulPanzer的解决方案仍然是最佳选择,尽管我不确定它正在缓存什么:

最初的回答

%timeit f_pp()
The slowest run took 14.79 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 1.88 ms per loop

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接