在同长度的一维numpy数组上评估一维函数数组的高效算法

4

我有一个长度为N的由k个不同函数组成的数组,以及一个长度为N的abcissa数组。我想在abcissa处评估这些函数,以返回一个长度为N的ordinates数组,并且至关重要的是,我需要非常快地完成这个操作。

我尝试了以下循环调用np.where,但速度太慢:

创建一些假数据来说明问题:

def trivial_functional(i): return lambda x : i*x
k = 250
func_table = [trivial_functional(j) for j in range(k)]
func_table = np.array(func_table) # possibly unnecessary

我们有一个包含250个不同函数的表格。现在我创建了一个大数组,其中包含许多重复的这些函数,并且还有一组相同长度的点,用于对这些函数进行评估。

Npts = 1e6
abcissa_array = np.random.random(Npts)
function_indices = np.random.random_integers(0,len(func_table)-1,Npts)
func_array = func_table[function_indices]

最后,循环遍历数据使用的每个函数,并在相关点集上对其进行评估:
desired_output = np.zeros(Npts)
for func_index in set(function_indices):
    idx = np.where(function_indices==func_index)[0]
    desired_output[idx] = func_table[func_index](abcissa_array[idx])

这个循环在我的笔记本电脑上需要大约0.35秒,是代码中最大的瓶颈。有没有人看到如何避免对np.where的盲目查找调用?是否可以巧妙地使用numba来加速此循环?


可能希望将此内容发布到codereview.stackexchange.com。 - Romain Braun
如果你跳过对 np.where 的调用并使用布尔索引,即 idx = function_indices == func_index,那么你可以使它更快,其他的一切保持不变。 - Jaime
你的每个函数只能一次处理一个横坐标点吗?没有一次性处理多个的可能性吗? - hpaulj
1
所以是对where的重复调用让你崩溃了。你需要一些可以组织索引并在循环中快速访问的排序或分组方式。 - hpaulj
是的,@hpaulj,就是这样!分组方法正是所需!我很快会发布我的答案,它比where循环快20倍。非常感谢您的建议! - aph
显示剩余3条评论
3个回答

4
这几乎与你(优秀的!)自我回答做了同样的事情,但少了一些复杂操作。在我的机器上似乎稍微快一点——根据一个粗略的测试,大约快了30毫秒。 test
def apply_indexed_fast(array, func_indices, func_table):
    func_argsort = func_indices.argsort()
    func_ranges = list(np.searchsorted(func_indices[func_argsort], range(len(func_table))))
    func_ranges.append(None)
    out = np.zeros_like(array)
    for f, start, end in zip(func_table, func_ranges, func_ranges[1:]):
        ix = func_argsort[start:end]
        out[ix] = f(array[ix])
    return out

这段代码将argsort索引序列拆分成多个子块,每个子块对应于func_table中的一个函数。然后使用每个子块来选择相应函数的输入和输出索引。为了确定子块边界,它使用np.searchsorted而不是np.unique--这里searchsorted(a, b)可以看作是返回给定值或值在b中的第一个等于或大于a中值的二分搜索算法。

然后,zip函数简单地并行迭代其参数,从每个参数中返回一个元素,并将这些元素收集到一个元组中,再将它们串联成一个列表(例如zip([1, 2, 3], ['a', 'b', 'c'], ['b', 'c', 'd'])返回[(1, 'a', 'b'), (2, 'b', 'c'), (3, 'c', 'd')])。这个函数和for语句自带的“解包”元组的功能,允许以简洁但富有表现力的方式并行迭代多个序列。

在这种情况下,我将其用于遍历func_tables中的函数和两个不同步的func_ranges副本。这确保了end变量中的func_ranges项始终比start变量中的项多一步。通过将None附加到func_ranges,我确保最后一个块能够得到优雅地处理——当任何一个参数用完时,zip就会停止迭代,这截断了序列中的最后一个值。方便的是,None值也作为开放式切片索引!

另一种可以做到同样效果的技巧需要更多的代码,但内存开销较小,特别是与zipitertools等效函数izip一起使用时。

range_iter_a = iter(func_ranges)   # create generators that iterate over the 
range_iter_b = iter(func_ranges)   # values in `func_ranges` without making copies
next(range_iter_b, None)           # advance the second generator by one
for f, start, end in itertools.izip(func_table, range_iter_a, range_iter_b):
    ...

然而,这些低开销的基于生成器的方法有时会比普通列表慢一些。此外,请注意在Python 3中,zip的行为更像izip


非常好。我可以通过自己的独立测试确认你的时间表:你的解决方案比我的快约20%,你的语法更加简洁,我们的代码甚至在一些棘手的边缘情况下也是一致的。这太棒了! - aph
@aph,很高兴听到测试结果一致 - 我本打算运行几个测试,但突然有事离开了。我也会为未来的访问者添加一些解释。 - senderle
如果您不介意的话,能否详细解释一下一些事情会很好。特别是您使用的zip函数,我并不觉得它很直观,但它看起来很优雅,所以我想在这样一个实际应用中学习它的语法。 - aph
1
又被分心了,但是我刚刚找到时间来发布。如果你有更多问题,请告诉我! - senderle
这真是超出了 "职责范围",senderle。但是,哇,非常感谢您抽出时间。这对我来说在几个话题上都非常有指导意义,包括关于itertools和生成器的额外部分。我已经写了多年自己的groupby计算(虽然不是用python),从未见过如此完美的计算,所以对我来说这是一个真正开眼界的SO答案。如果我能够给多个点赞,我会的。干杯! - aph

2
感谢hpaulj的建议,采用了groupby方法。虽然有很多现成的程序可用于此操作,例如Pandas DataFrames,但它们都带有数据结构初始化的开销,这是一次性的,但如果仅用于单个计算,则可能代价高昂。
下面是我纯numpy解决方案,比我之前使用的where循环快13倍。要点总结是我使用np.argsort和np.unique以及一些花式索引技巧。
首先,我们对函数索引进行排序,然后找到排序数组中每个新索引开始的元素。
idx_funcsort = np.argsort(function_indices)
unique_funcs, unique_func_indices = np.unique(function_indices[idx_funcsort], return_index=True)

现在不再需要盲目查找,因为我们确切地知道排序数组的哪个部分对应于每个唯一函数。因此,我们仍然循环遍历每个被调用的函数,但不调用where:

for func_index in range(len(unique_funcs)-1):
    idx_func = idx_funcsort[unique_func_indices[func_index]:unique_func_indices[func_index+1]]
    func = func_table[unique_funcs[func_index]]
    desired_output[idx_func] = func(abcissa_array[idx_func])

这里已经涵盖了除最后一个索引之外的所有内容,但由于Python索引约定的原因,我们需要单独调用它,有点令人讨厌:

func_index = len(unique_funcs)-1
idx_func = idx_funcsort[unique_func_indices[func_index]:]
func = func_table[unique_funcs[func_index]]
desired_output[idx_func] = func(abcissa_array[idx_func])

这个循环的结果与使用where循环的结果完全相同(用于核对),但是这个循环的运行时间为0.027秒,比我最初的计算快了13倍。


1
一个小建议。x[a:] 等同于 x[a:None] -- 所以,你可以将 unique_func_indices 转换为纯列表并附加一个 None 值,而不是分解最后一次调用。 - senderle

0

这是一个很好的函数式编程在Python中被部分模拟的例子。

现在,如果你想将你的函数应用于一组点,我建议使用numpyufunc框架,它将允许你创建非常快速的向量化版本的函数。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接