在NumPy中计算距离的更高效方法?

6
我有一个问题,如何在numpy中尽可能快地计算距离。
def getR1(VVm,VVs,HHm,HHs):
    t0=time.time()
    R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
    R*=R
    R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
    R1*=R1
    R+=R1
    del R1
    print "R1\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) 
    print numpy.max(R) #4176.26290975
    # uses 17.5Gb ram
    return R


def getR2(VVm,VVs,HHm,HHs):
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
    #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
    R = numpy.einsum('ijk,ijk->ij', deltas, deltas)
    print "R2\t",time.time()-t0,R.shape, #14.5291359425 (108225, 10500)
    print numpy.max(R) #4176.26290975
    # uses 26Gb ram
    return R


def getR3(VVm,VVs,HHm,HHs):
    from numpy.core.umath_tests import inner1d
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    deltas = precomputed_flat[None,:,:] - measured_flat[:, None, :]
    #print time.time()-t0, deltas.shape # 5.861109972 (108225, 10500, 2)
    R = inner1d(deltas, deltas)
    print "R3\t",time.time()-t0, R.shape, #12.6972110271 (108225, 10500)
    print numpy.max(R) #4176.26290975
    #Uses 26Gb
    return R


def getR4(VVm,VVs,HHm,HHs):
    from scipy.spatial.distance import cdist
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    R=spdist.cdist(precomputed_flat,measured_flat, 'sqeuclidean') #.T
    print "R4\t",time.time()-t0, R.shape, #17.7022118568 (108225, 10500)
    print numpy.max(R) #4176.26290975
    # uses 9 Gb ram
    return R

def getR5(VVm,VVs,HHm,HHs):
    from scipy.spatial.distance import cdist
    t0=time.time()
    precomputed_flat = numpy.column_stack((VVs.flatten(), HHs.flatten()))
    measured_flat = numpy.column_stack((VVm.flatten(), HHm.flatten()))
    R=spdist.cdist(precomputed_flat,measured_flat, 'euclidean') #.T
    print "R5\t",time.time()-t0, R.shape, #15.6070930958 (108225, 10500)
    print numpy.max(R) #64.6240118667
    # uses only 9 Gb ram
    return R

def getR6(VVm,VVs,HHm,HHs):
    from scipy.weave import blitz
    t0=time.time()
    R=VVs.flatten()[numpy.newaxis,:]-VVm.flatten()[:,numpy.newaxis]
    blitz("R=R*R") # R*=R
    R1=HHs.flatten()[numpy.newaxis,:]-HHm.flatten()[:,numpy.newaxis]
    blitz("R1=R1*R1") # R1*=R1
    blitz("R=R+R1") # R+=R1
    del R1
    print "R6\t",time.time()-t0, R.shape, #11.7576191425 (108225, 10500) 
    print numpy.max(R) #4176.26290975
    return R

以下是结果的时间:
R1  11.7737319469 (108225, 10500) 4909.66881791
R2  15.1279799938 (108225, 10500) 4909.66881791
R3  12.7408981323 (108225, 10500) 4909.66881791
R4  17.3336868286 (10500, 108225) 4909.66881791
R5  15.7530870438 (10500, 108225) 70.0690289494
R6  11.670968771 (108225, 10500) 4909.66881791

虽然最后一个公式是sqrt((VVm-VVs)^2+(HHm-HHs)^2),但其他的都是(VVm-VVs)^2+(HHm-HHs)^2。这并不重要,因为在我的代码中我会对每个i的R[i,:]取最小值,而sqrt并不影响最小值(如果我想要距离,我只需要取sqrt(value),而不是对整个数组进行sqrt,所以时间上没有任何差异。

问题是:第一个解决方案为什么是最好的呢?(第二和第三个较慢的原因是因为deltas=...需要5.8秒(这也是这两种方法需要26GB的原因),而sqeuclidean比euclidean更慢?

sqeuclidean应该只需执行(VVm-VVs)^2+(HHm-HHs)^2,但我认为它执行了不同的操作。有人知道如何找到该方法的源代码(C或底层语言)吗?我认为它执行了sqrt((VVm-VVs)^2+(HHm-HHs)^2)^2(我能想到唯一的原因是它比(VVm-VVs)^2+(HHm-HHs)^2慢——我知道这是一个愚蠢的理由,有没有更合理的原因?

由于我对C一无所知,我该如何用scipy.weave内联它?那个代码是否可以像普通的Python一样编译?还是我需要安装特殊的东西?

编辑:好吧,我用scipy.weave.blitz尝试了一下(R6方法),速度稍微快了一些,但我认为懂得C语言的人仍然可以提高这个速度。我只取了形如a+=b或*=的行,并查找了它们在C中的表现形式,并将它们放入blitz语句中,但我想如果我也将带有flatten和newaxis语句的行放入C中,那么速度应该会更快,但我不知道如何做到这一点(懂C语言的人可能解释一下吗?)。目前,使用blitz和第一个方法之间的差异并不大,我猜这不是由于C与numpy之间的差异造成的?

我猜其他方法,比如deltas=...,如果我将其放入C中,速度也会更快?


2
考虑尝试一些类似于http://jakevdp.github.io/blog/2013/06/15/numba-vs-cython-take-2/的东西(特别是“使用广播的numpy”部分)。 - ev-br
你可以通过不为 R 分配内存(即只使用 R1 += R3)来节省几秒钟的时间。 - bogatron
不是降到1秒,但如果你使用32位浮点数,那么这将节省大约4GB的RAM分配,这是非常重要的。如果它让你避免使用交换空间,那么它将是一个显著的改进。考虑到它需要多少内存(除非你有很多RAM并且显着地多线程),我会惊讶于它能否在C中降至1秒(即使没有Python)。 - bogatron
幸运的是!我怀疑如果避免不必要的分配,您仍然可以进行非微不足道的改进。如果您还没有这样做,您可能需要验证您是否正在运行较低级别库(BLAS、ATLAS)的多线程版本。 - bogatron
经过一些编辑,问题仍然是相同的,但结构更清晰,并具有更多的时间安排。 - usethedeathstar
显示剩余3条评论
1个回答

7
无论何时你需要进行乘法和加法运算,请尝试使用点积函数或者np.einsum。由于你正在预分配数组,而不是为水平和垂直坐标分别创建不同的数组,请将它们堆叠在一起:
precomputed_flat = np.column_stack((svf.flatten(), shf.flatten()))
measured_flat = np.column_stack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat - measured_flat[:, None, :]

从这里开始,最简单的方法是:

dist = np.einsum('ijk,ijk->ij', deltas, deltas)

您也可以尝试类似以下的方法:

...

from numpy.core.umath_tests import inner1d
dist = inner1d(deltas, deltas)

当然还有SciPy的空间模块cdist:

from scipy.spatial.distance import cdist
dist = cdist(precomputed_flat, measured_flat, 'euclidean')

编辑 我无法在如此大的数据集上运行测试,但以下时间表格非常有启发性:

len_a, len_b = 10000, 1000

a = np.random.rand(2, len_a)
b =  np.random.rand(2, len_b)
c = np.random.rand(len_a, 2)
d = np.random.rand(len_b, 2)

In [3]: %timeit a[:, None, :] - b[..., None]
10 loops, best of 3: 76.7 ms per loop

In [4]: %timeit c[:, None, :] - d
1 loops, best of 3: 221 ms per loop

对于上述较小的数据集,我可以通过使用scipy.spatial.distance.cdist并以不同的内存方式排列数据,略微提高速度,并与inner1d相匹配。

precomputed_flat = np.vstack((svf.flatten(), shf.flatten()))
measured_flat = np.vstack((VVmeasured.flatten(), HHmeasured.flatten()))
deltas = precomputed_flat[:, None, :] - measured_flat

import scipy.spatial.distance as spdist
from numpy.core.umath_tests import inner1d

In [13]: %timeit r0 = a[0, None, :] - b[0, :, None]; r1 = a[1, None, :] - b[1, :, None]; r0 *= r0; r1 *= r1; r0 += r1
10 loops, best of 3: 146 ms per loop

In [14]: %timeit deltas = (a[:, None, :] - b[..., None]).T; inner1d(deltas, deltas)
10 loops, best of 3: 145 ms per loop

In [15]: %timeit spdist.cdist(a.T, b.T)
10 loops, best of 3: 124 ms per loop

In [16]: %timeit deltas = a[:, None, :] - b[..., None]; np.einsum('ijk,ijk->jk', deltas, deltas)
10 loops, best of 3: 163 ms per loop

可以使用np.tensordot()来代替np.einsum,它也有非常灵活的符号表示法... - Saullo G. P. Castro
很遗憾,你提出的三种方法都比较慢(因为deltas=...已经需要六秒钟了),所以它们更慢。 - usethedeathstar
你最快的方式仍然比我的慢,但我认为这是因为在我的情况下,我只需要计算 (x-x')**2+(y-y')**2,然后从中得到最小值,但是取其平方根不会改变最小值所在的位置,所以在计算时我不进行平方根操作,而cdist函数则做了这个(我猜想)。([15]的代码运行时间为15.7秒,而我的代码运行时间为11.8秒,但我认为这是因为cdist函数中有求平方根的操作,如果有一个不含平方根操作的函数,它应该会更快。 - usethedeathstar
我正在努力理解numpy.tensordot的axes参数,只是想看看它是否比numpy.einsum更快,但我似乎无法弄清楚它们的工作原理。 - usethedeathstar
由于sqeucl比eucl慢,我猜想使用scipy.weave会更快?但是我对C一无所知(除了它很快),在weave中,我需要将导致问题的行的C代码作为引用或其他形式放入,但我不确定它如何工作(包括编程和技术解释)。 - usethedeathstar
显示剩余8条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接