使用Python和Dask计算欧几里得距离

3
我正在尝试确定欧几里得距离矩阵中落在特定阈值以下的元素。然后,我使用这个搜索的位置参数来比较第二个数组中的元素(为了演示,这个数组是PCA的第一个特征向量,但排序是我问题中最相关的部分)。该应用程序需要适用于未知数量的观测值,并且应该能够有效地处理数百万观测值。
import numpy as np
from scipy.spatial.distance import cdist

threshold = 10
data = np.random.uniform((1, 2, 3), 5000)

searchValues = np.where(cdist(data, data) < threshold)

我的问题有两个方面。

首先,欧几里德距离矩阵很快变得太大,无法简单地应用scipy.spatial.distance.cdist()。为了解决这个问题,我将cdist函数分批应用于数据集,并迭代实现搜索。

cdist(data, data) 

Traceback (most recent call last):
  File "C:\Users\tl928yx\AppData\Local\Continuum\anaconda3\lib\site-packages\IPython\core\interactiveshell.py", line 2862, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-10-fb93ae543712>", line 1, in <module>
    cdist(data, data)
  File "C:\Users\tl928yx\AppData\Local\Continuum\anaconda3\lib\site-packages\scipy\spatial\distance.py", line 2142, in cdist
    dm = np.zeros((mA, mB), dtype=np.double)
MemoryError

第二个问题是一个运行时问题,由于迭代地构建距离矩阵而导致。当我采用迭代方法时,运行时间呈指数级增长。这并不出乎意料,因为迭代方法的本质如此。

import numpy as np
import dask.array as da
from scipy.spatial.distance import cdist
import itertools
import timeit

threshold = 10
data = np.random.uniform(1, 100, (200000,40))  #Build random data
data = da.asarray(data)

it = round(data.shape[0]/10000)
dataArrays = [data[i*10000:(i+1)*10000] for i in range(0, it)]

comparisons = itertools.combinations(dataArrays, 2)

start = timeit.default_timer()
searchvalues = []
for comparison in comparisons:
    searchvalues.append(np.where(cdist(comparison[0], comparison[1]) < threshold))
time = timeit.default_timer() - start
print(time)

由于问题的本质,这些问题都是可以预料到的。为了尝试抵消这两个问题,我尝试使用dask在python中实现大数据框架,并在批处理中插入并行化。然而,这并没有显著提高计算时间,而且在dask的迭代方法中,我有一个相当严格的内存限制(需要一次输入1000个obs)。

from dask.diagnostics import ProgressBar
import dask.delayed
import dask.bag

@dask.delayed
def eucDist(comparison):
    return da.asarray(cdist(comparison[0], comparison[1]))

@dask.delayed
def findValues(euclideanMatrix):
    return np.where(euclideanMatrix < threshold)

start = timeit.default_timer()
searchvalues = []
test = []
for comparison in comparisons:
    comp = dask.delayed(eucDist)(comparison)
    test.append(comp)

look = []

with ProgressBar():
    for element in test:
        look.append(dask.delayed(findValues)(element).compute())

我希望能够并行比较以提高速度,但是我不确定如何在Python中实现。如果您能提供任何帮助,或者有任何建议可以改进初始比较代码,我将非常感激。

2个回答

1

0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接