将函数应用于ndarray的每一行

13

我有一个函数用于计算向量x到均值的平方马氏距离:

def mahalanobis_sqdist(x, mean, Sigma):
   '''
    Calculates squared Mahalanobis Distance of vector x 
    to distibutions' mean 
   '''
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = x - mean
   sqmdist = np.dot(np.dot(xdiff, Sigma_inv), xdiff)
   return sqmdist

我有一个形状为(25, 4)的numpy数组。因此,我想在不使用for循环的情况下将该函数应用于数组的所有25行。那么,基本上,我该如何编写这个循环的向量化形式:

for r in d1:
    mahalanobis_sqdist(r[0:4], mean1, Sig1)

其中 mean1Sig1 是:

>>> mean1
array([ 5.028,  3.48 ,  1.46 ,  0.248])
>>> Sig1 = np.cov(d1[0:25, 0:4].T)
>>> Sig1
array([[ 0.16043333,  0.11808333,  0.02408333,  0.01943333],
       [ 0.11808333,  0.13583333,  0.00625   ,  0.02225   ],
       [ 0.02408333,  0.00625   ,  0.03916667,  0.00658333],
       [ 0.01943333,  0.02225   ,  0.00658333,  0.01093333]])

我已经尝试了以下方法,但没有成功:
>>> vecdist = np.vectorize(mahalanobis_sqdist)
>>> vecdist(d1, mean1, Sig1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/dist-packages/numpy/lib/function_base.py", line 1862, in __call__
    theout = self.thefunc(*newargs)
  File "<stdin>", line 6, in mahalanobis_sqdist
  File "/usr/lib/python2.7/dist-packages/numpy/linalg/linalg.py", line 445, in inv
    return wrap(solve(a, identity(a.shape[0], dtype=a.dtype)))
IndexError: tuple index out of range

3
scipy.spatial.distance模块也可以为您完成所有这些操作。例如,代码将是cdist(d1, mean1[None], 'mahalanobis')**2。如果mean1不是点的实际平均值,则应单独计算协方差和逆,并进行cdist(d1, mean1[None], 'mahalanobis', VI=Sigma_inv)**2 - user2379410
4个回答

20
为了对数组的每一行应用一个函数,您可以使用以下代码:
np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)    

在这种情况下,有一种更好的方法。您不必对每一行应用函数。相反,您可以将NumPy操作应用于整个d1数组以计算相同的结果。np.einsum可以替代for循环和两个调用np.dot:
def mahalanobis_sqdist2(d, mean, Sigma):
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = d - mean
   return np.einsum('ij,im,mj->i', xdiff, xdiff, Sigma_inv)

这里有一些基准测试结果:
import numpy as np
np.random.seed(1)

def mahalanobis_sqdist(x, mean, Sigma):
   '''
   Calculates squared Mahalanobis Distance of vector x 
   to distibutions mean 
   '''
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = x - mean
   sqmdist = np.dot(np.dot(xdiff, Sigma_inv), xdiff)
   return sqmdist

def mahalanobis_sqdist2(d, mean, Sigma):
   Sigma_inv = np.linalg.inv(Sigma)
   xdiff = d - mean
   return np.einsum('ij,im,mj->i', xdiff, xdiff, Sigma_inv)

def using_loop(d1, mean, Sigma):
    expected = []
    for r in d1:
        expected.append(mahalanobis_sqdist(r[0:4], mean1, Sig1))
    return np.array(expected)

d1 = np.random.random((25,4))
mean1 = np.array([ 5.028,  3.48 ,  1.46 ,  0.248])
Sig1 = np.cov(d1[0:25, 0:4].T)

expected = using_loop(d1, mean1, Sig1)
result = np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
result2 = mahalanobis_sqdist2(d1, mean1, Sig1)
assert np.allclose(expected, result)
assert np.allclose(expected, result2)

In [92]: %timeit mahalanobis_sqdist2(d1, mean1, Sig1)
10000 loops, best of 3: 31.1 µs per loop
In [94]: %timeit using_loop(d1, mean1, Sig1)
1000 loops, best of 3: 569 µs per loop
In [91]: %timeit np.apply_along_axis(mahalanobis_sqdist, 1, d1, mean1, Sig1)
1000 loops, best of 3: 806 µs per loop

因此,mahalanobis_sqdist2比使用for-loop快约18倍,比使用np.apply_along_axis快约26倍。
请注意,np.apply_along_axisnp.vectorizenp.frompyfunc是Python的实用函数。在底层,它们使用for-while-loop。这里没有真正的“向量化”操作。它们可以提供语法上的帮助,但不要指望它们能使你的代码比你自己编写的for-loop运行得更好。

我尝试了这个代码:np.apply_along_axis(mahalanobis_sqdist, axis=1, arr=d1, args=(mean1, Sig1))但是我遇到了以下错误:Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: apply_along_axis() got an unexpected keyword argument 'args' - Vahid Mirjalili
我的错误。args 不是关键字参数。我已经在上面进行了更正。 - unutbu
2
加一条注:注意np.apply_along_axis、np.vectorize和np.frompyfunc是Python的实用函数。在底层,它们使用for-或while循环。这里没有真正的“向量化”操作。- 我不知道这一点,知道了很好,谢谢! - sparc_spread

9

@unutbu的答案非常适用于将任何函数应用于数组的行。

在这种特殊情况下,如果您正在使用大型数组,可以使用一些数学对称性来加快处理速度。

这是函数的修改版本:

def mahalanobis_sqdist3(x, mean, Sigma):
    Sigma_inv = np.linalg.inv(Sigma)
    xdiff = x - mean
    return (xdiff.dot(Sigma_inv)*xdiff).sum(axis=-1)

如果您使用任何类型的大型Sigma,请建议缓存Sigma_inv并将其作为参数传递给您的函数。在这个示例中,它是4x4大小,所以这并不重要。我将展示如何处理大型Sigma,以便其他人遇到此类问题时也能参考。
如果您不会重复使用相同的Sigma,则无法缓存它。因此,您可以使用不同的方法来解决线性系统,而不是对矩阵求逆。这里我将使用内置于SciPy中的LU分解。只有当x的列数相对于其行数很大时,才会提高时间效率。
以下是一个展示该方法的函数:
from scipy.linalg import lu_factor, lu_solve
def mahalanobis_sqdist4(x, mean, Sigma):
    xdiff = x - mean
    Sigma_inv = lu_factor(Sigma)
    return (xdiff.T*lu_solve(Sigma_inv, xdiff.T)).sum(axis=0)

这里是一些时间。我将包括其他答案中提到的使用einsum的版本。
import numpy as np
Sig1 = np.array([[ 0.16043333,  0.11808333,  0.02408333,  0.01943333],
                 [ 0.11808333,  0.13583333,  0.00625   ,  0.02225   ],
                 [ 0.02408333,  0.00625   ,  0.03916667,  0.00658333],
                 [ 0.01943333,  0.02225   ,  0.00658333,  0.01093333]])
mean1 = np.array([ 5.028,  3.48 ,  1.46 ,  0.248])
x = np.random.rand(25, 4)
%timeit np.apply_along_axis(mahalanobis_sqdist, 1, x, mean1, Sig1)
%timeit mahalanobis_sqdist2(x, mean1, Sig1)
%timeit mahalanobis_sqdist3(x, mean1, Sig1)
%timeit mahalanobis_sqdist4(x, mean1, Sig1)

提供:

1000 loops, best of 3: 973 µs per loop
10000 loops, best of 3: 36.2 µs per loop
10000 loops, best of 3: 40.8 µs per loop
10000 loops, best of 3: 83.2 µs per loop

然而,改变涉及数组的大小会改变时间结果。例如,让x = np.random.rand(2500, 4),这些时间如下:

10 loops, best of 3: 95 ms per loop
1000 loops, best of 3: 355 µs per loop
10000 loops, best of 3: 131 µs per loop
1000 loops, best of 3: 337 µs per loop

当我们定义x = np.random.rand(1000, 1000)Sigma1 = np.random.rand(1000, 1000),以及mean1 = np.random.rand(1000)时,下面是时间统计结果:

1 loops, best of 3: 1min 24s per loop
1 loops, best of 3: 2.39 s per loop
10 loops, best of 3: 155 ms per loop
10 loops, best of 3: 99.9 ms per loop

编辑:我注意到其他答案中使用了Cholesky分解。 鉴于Sigma是对称且正定的,我们实际上可以比我以上的结果做得更好。 通过SciPy可以使用BLAS和LAPACK提供的一些良好的例程来处理对称正定矩阵。 以下是两个更快的版本。

from scipy.linalg.fblas import dsymm
def mahalanobis_sqdist5(x, mean, Sigma_inv):
    xdiff = x - mean
    Sigma_inv = la.inv(Sigma)
    return np.einsum('...i,...i->...',dsymm(1., Sigma_inv, xdiff.T).T, xdiff)
from scipy.linalg.flapack import dposv
def mahalanobis_sqdist6(x, mean, Sigma):
    xdiff = x - mean
    return np.einsum('...i,...i->...', xdiff, dposv(Sigma, xdiff.T)[1].T)

第一个函数仍然反转Sigma。 如果您预先计算并重复使用它,速度会更快(1000x1000的情况在我的机器上需要35.6毫秒)。 我还使用了einsum来取乘积,然后沿最后一轴进行求和。 这最终比执行 (A * B).sum(axis=-1)之类的操作稍微快一些。 这两个函数的计时如下:
10000 loops, best of 3: 55.3 µs per loop
100000 loops, best of 3: 14.2 µs per loop

第二个测试用例:

10000 loops, best of 3: 121 µs per loop
10000 loops, best of 3: 79 µs per loop

第三个测试用例:
10 loops, best of 3: 92.5 ms per loop
10 loops, best of 3: 48.2 ms per loop

非常好!我不需要为每个数据点计算Sigma_inv!真的很有趣的讨论!我喜欢它。 - Vahid Mirjalili

5

刚在reddit上看到一个非常好的评论,可能会使事情更快一点:

对于经常使用numpy的人来说,这并不奇怪。在python中使用循环速度非常慢。实际上,einsum也相当慢。如果您有大量向量(500个四维向量足以使此版本比我的机器上的einsum更快),则以下版本更快:

def no_einsum(d, mean, Sigma):
    L_inv = np.linalg.inv(numpy.linalg.cholesky(Sigma))
    xdiff = d - mean
    return np.sum(np.dot(xdiff, L_inv.T)**2, axis=1)

如果你的点也是高维的,那么计算它的逆矩阵会很慢(而且通常也不是一个好主意),你可以通过直接解决这个系统来节省时间(在我的机器上,500个250维向量足以使该版本成为最快的)。
def no_einsum_solve(d, mean, Sigma):
    L = numpy.linalg.cholesky(Sigma)
    xdiff = d - mean
    return np.sum(np.linalg.solve(L, xdiff.T)**2, axis=0)

1
很好,Sigma是对称正定的。我之前没有注意到这一点。这使得Cholesky分解成为一个可行的选择。 - IanH

0
问题在于 np.vectorize 对所有参数进行向量化,但你只需要对第一个参数进行向量化。你需要使用 excluded 关键字参数来进行向量化:

np.vectorize(mahalanobis_sqdist, excluded=[1, 2])

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接