加速核估计的采样

11

这是我使用的一个庞大代码的MWE示例。基本上,它对位于某个阈值以下的所有值执行了一个KDE(核密度估计)的Monte Carlo积分(该积分方法建议在此问题中使用:Integrate 2D kernel density estimate)。

import numpy as np
from scipy import stats
import time

# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()

# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)

# Define point below which to integrate the kernel.
x1, y1 = 0.5, 0.5

# Get kernel value for this point.
tik = time.time()
iso = kernel((x1,y1))
print 'iso: ', time.time()-tik

# Sample from KDE distribution (Monte Carlo process).
tik = time.time()
sample = kernel.resample(size=1000)
print 'resample: ', time.time()-tik

# Filter the sample leaving only values for which
# the kernel evaluates to less than what it does for
# the (x1, y1) point defined above.
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik

# Integrate for all values below iso.
tik = time.time()
integral = insample.sum() / float(insample.shape[0])
print 'integral: ', time.time()-tik

输出结果看起来像这样:
iso:  0.00259208679199
resample:  0.000817060470581
filter/sample:  2.10829401016
integral:  4.2200088501e-05

这句话的意思是,很明显过滤器/样本调用几乎占用了代码运行所需的所有时间。我必须迭代地运行这个代码块数千次,所以它可能非常耗时。
有没有办法加快过滤/采样过程的速度?

添加

这是一个稍微更加真实的MWE,其中包含了Ophion的多线程解决方案:

import numpy as np
from scipy import stats
from multiprocessing import Pool

def kde_integration(m_list):

    m1, m2 = [], []
    for item in m_list:
        # Color data.
        m1.append(item[0])
        # Magnitude data.
        m2.append(item[1])

    # Define limits.
    xmin, xmax = min(m1), max(m1)
    ymin, ymax = min(m2), max(m2)

    # Perform a kernel density estimate on the data:
    x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
    values = np.vstack([m1, m2])
    kernel = stats.gaussian_kde(values)

    out_list = []

    for point in m_list:

        # Compute the point below which to integrate.
        iso = kernel((point[0], point[1]))

        # Sample KDE distribution
        sample = kernel.resample(size=1000)

        #Create definition.
        def calc_kernel(samp):
            return kernel(samp)

        #Choose number of cores and split input array.
        cores = 4
        torun = np.array_split(sample, cores, axis=1)

        #Calculate
        pool = Pool(processes=cores)
        results = pool.map(calc_kernel, torun)

        #Reintegrate and calculate results
        insample_mp = np.concatenate(results) < iso

        # Integrate for all values below iso.
        integral = insample_mp.sum() / float(insample_mp.shape[0])

        out_list.append(integral)

    return out_list


# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Create list to pass.
m_list = []
for i in range(60):
    m1, m2 = measure(5)
    m_list.append(m1.tolist())
    m_list.append(m2.tolist())

# Call KDE integration function.
print 'Integral result: ', kde_integration(m_list)

Ophion 提供的解决方案对我提供的原始代码非常有效,但在此版本中失败并显示以下错误:

Integral result: Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我尝试移动calc_kernel函数,因为这个问题的一个答案Multiprocessing: How to use Pool.map on a function defined in a class?中指出:"您提供给map()的函数必须通过导入您的模块来访问";但我仍然无法使此代码工作。

非常感谢任何帮助。


加2

实现 Ophion 的建议,移除 calc_kernel 函数并直接使用:

results = pool.map(kernel, torun)

我的工作是为了解决PicklingError,但现在我发现,如果我创建一个初始的m_list,其中包含超过62-63个项目,我会遇到这个错误:

Traceback (most recent call last):
  File "~/gauss_kde_temp.py", line 67, in <module>
    print 'Integral result: ', kde_integration(m_list)
  File "~/gauss_kde_temp.py", line 38, in kde_integration
    pool = Pool(processes=cores)
  File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 161, in __init__
    self._result_handler.start()
  File "/usr/lib/python2.7/threading.py", line 494, in start
    _start_new_thread(self.__bootstrap, ())
thread.error: can't start new thread

由于我实际代码中的列表最多可以有2000个项目,所以这个问题使得代码无法使用。第38行是这一行:

pool = Pool(processes=cores)

我可以帮助你翻译。这段文字是关于编程的内容。它询问了与使用的核心数量有关的问题。下面是需要翻译的内容:

所以显然这与我使用的核心数量有关?

这个问题 "Python中无法启动新线程错误" 建议使用:

threading.active_count()

当我遇到错误时,我希望你能帮我检查正在运行的线程数。我已经检查过,每当达到 374 个线程时程序就会崩溃。如何在编码时解决这个问题?


这是与最后一个问题有关的新问题: 线程错误:无法启动新线程


2
运行 kernel(sample) < iso 总共需要 2 秒钟,其中 kernel(sample) 占用了 99.99% 的时间。 - Daniel
1
你可能需要重新表述一下,将“kernel(sample)”过程作为最消耗资源的部分。Scipy函数是相当优化的,除非有一些诡计可以避免完全调用它,否则很难做到太多改进。 - Daniel
你说得对。无论底层进程是什么,我都想让它变得更快 :) - Gabriel
2
你正在使用的核密度估计实现在 scipy/stats/kde.py 中。看起来主要步骤是将逆协方差乘以数据点上的度量。你可以尝试在Cython中重新实现这个方法(evaluate),但如果np.dot真的是瓶颈,那可能不会有太大帮助。或者,将数据矩阵分成较小的块,并在这些较小的块上调用kernel(block) - lmjohns3
1
实际上,我刚刚对较小的块的想法进行了分析,结果更糟糕了。 :) 取消掉吧。 - lmjohns3
显示剩余4条评论
2个回答

6

可能最简单的加快速度的方法是将kernel(sample)并行化:

这是代码片段:

tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik
#filter/sample:  1.94065904617

将此更改为使用multiprocessing

from multiprocessing import Pool
tik = time.time()

#Create definition.
def calc_kernel(samp):
    return kernel(samp)

#Choose number of cores and split input array.
cores = 4
torun = np.array_split(sample, cores, axis=1)

#Calculate
pool = Pool(processes=cores)
results = pool.map(calc_kernel, torun)

#Reintegrate and calculate results
insample_mp = np.concatenate(results) < iso

print 'multiprocessing filter/sample: ', time.time()-tik
#multiprocessing filter/sample:  0.496874094009

请确保它们返回相同的答案:

print np.all(insample==insample_mp)
#True

4个核心带来了3.9倍的性能提升。不确定您在运行什么,但在使用超过6个处理器后,您的输入数组大小并不足以获得显着的收益。例如,使用20个处理器只会快大约5.8倍。

太棒了!我在我的代码上尝试了一下,可以确认在我的4个核心上有约3.4倍的提升。我会再等一段时间看看是否有人能超过你的答案,否则我会将其标记为已接受。非常感谢! - Gabriel
我已经接受了你的答案,因为它与我发布的原始代码完美地配合,但如果你能看一下我编辑到问题中的新的稍微修改过的代码,我会非常感激。我可以在原始代码上完美地使用你的解决方案,但无法在我的实际代码的更现实版本上使其正常工作。很抱歉我没有从一开始就呈现这个版本,我没有预见到这样的问题。 - Gabriel
你应该能够直接将“sample”传递给内核。针对这个问题的一个简单解决方案是删除“calc_kernel”,并将“pool.map”更改为“pool.map(kernel, torun)”。如果这不起作用,那请告诉我,我会进一步研究它。我不知道在定义中放置多进程会产生这样的影响。 - Daniel
我会尝试并告诉您结果的。感谢您的耐心等待。 - Gabriel
我现在遇到了一个线程问题,但最好还是开一个新的问题,否则原来的问题会被新的编辑所掩盖。再次感谢你的帮助!这是我用50个点数最好的选择! :) - Gabriel

2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接