我有一个长度为 N 的大数组,包含 k 个不同的函数,以及一个长度为 N 的 abcissa 数组。由于 @senderle 提供的巧妙解决方案(Efficient algorithm for evaluating a 1-d array of functions on a same-length 1d numpy array),我可以使用快速的基于 numpy 的算法,在 abcissa 上评估这些函数,返回一个长度为 N 的 ordinates 数组。
def apply_indexed_fast(abcissa, func_indices, func_table):
""" Returns the output of an array of functions evaluated at a set of input points
if the indices of the table storing the required functions are known.
Parameters
----------
func_table : array_like
Length k array of function objects
abcissa : array_like
Length Npts array of points at which to evaluate the functions.
func_indices : array_like
Length Npts array providing the indices to use to choose which function
operates on each abcissa element. Thus func_indices is an array of integers
ranging between 0 and k-1.
Returns
-------
out : array_like
Length Npts array giving the evaluation of the appropriate function on each
abcissa element.
"""
func_argsort = func_indices.argsort()
func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
func_ranges.append(None)
out = np.zeros_like(abcissa)
for i in range(len(func_table)):
f = func_table[i]
start = func_ranges[i]
end = func_ranges[i+1]
ix = func_argsort[start:end]
out[ix] = f(abcissa[ix])
return out
我现在尝试使用多进程来并行化此函数内部的for循环。为了清晰起见,我将简要概述@senderle开发的算法。如果您可以立即阅读上面的代码并理解它,请跳过下一段文字。
首先,我们找到对输入func_indices进行排序的索引数组,用它来定义长度为k的整数数组func_ranges。 func_ranges的整数条目控制应用于输入abcissa的适当子数组的函数,其工作方式如下。让f是输入func_table中的第i个函数。然后,我们应该将函数f应用于的abcissa的切片是slice(func_ranges [i],func_ranges [i + 1])。因此,一旦计算出func_ranges,我们只需在输入func_table上运行简单的for循环,并依次将每个函数对象应用于适当的切片,填充输出数组。请参见下面的代码,以了解此算法在操作中的最小示例。
def trivial_functional(i):
def f(x):
return i*x
return f
k = 250
func_table = np.array([trivial_functional(j) for j in range(k)])
Npts = 1e6
abcissa = np.random.random(Npts)
func_indices = np.random.random_integers(0,len(func_table)-1,Npts)
result = apply_indexed_fast(abcissa, func_indices, func_table)
所以我的目标现在是使用多进程来并行计算。我认为使用线程的尴尬并行循环的常规技巧应该很简单。但是,我以下的尝试引发了一个我不理解的异常。
from multiprocessing import Pool, cpu_count
def apply_indexed_parallelized(abcissa, func_indices, func_table):
func_argsort = func_indices.argsort()
func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
func_ranges.append(None)
out = np.zeros_like(abcissa)
num_cores = cpu_count()
pool = Pool(num_cores)
def apply_funci(i):
f = func_table[i]
start = func_ranges[i]
end = func_ranges[i+1]
ix = func_argsort[start:end]
out[ix] = f(abcissa[ix])
pool.map(apply_funci, range(len(func_table)))
pool.close()
return out
result = apply_indexed_parallelized(abcissa, func_indices, func_table)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我在 Stack Overflow 上看到过这个问题:如何在类中使用 Pool.map 进行多进程处理?。我已经尝试了那里提出的每种方法;在所有情况下,我都会得到一个“打开的文件太多”的错误,因为线程从未关闭,或者适应的算法只是挂起。这似乎应该有一个简单的解决方案,因为这只是线程化一个并行的 for 循环。
__main__
命名空间访问的函数(即将在另一个函数内部定义或者超出主要范围的函数传递给池)。 - Joe Kingtonsharedmem
,https://github.com/sturlamolden/sharedmem-numpy。但它只能节省内存,而不能节省时间。 - hpaulj