在numpy行上并行化循环

5
我需要将相同的函数应用于numpy数组中的每一行,并将结果再次存储在numpy数组中。
# states will contain results of function applied to a row in array
states = np.empty_like(array)

for i, ar in enumerate(array):
    states[i] = function(ar, *args)

# do some other stuff on states

function执行一些非常复杂的数据过滤操作,当条件为True或False时,返回一个数组。 function可以是纯Python或Cython编译的。行过滤操作很复杂,可以依赖于行中以前的值,这意味着我不能按元素遍历整个阵列。

例如,是否有一种方法在Dask中实现这样的操作?


还是不太明白。i 是从哪里来的?你是想调用 enumerate 吗? - Neil G
你的函数只接受当前行还是也可以接受其他行? - hellpanderr
该函数接受任何1D numpy数组。它不关心该数组来自哪里。 - Max Linke
2个回答

7

Dask 解决方案

您可以通过将数组按行分块,调用 map_blocks,然后计算结果来使用 dask.array 进行处理。

ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()

默认情况下,它将使用线程,您可以按照以下方式使用进程。
from dask.multiprocessing import get
states = x.compute(get=get)

线程池解决方案

然而,对于像这样的简单并行计算,dask 可能过于复杂了,您可以使用线程池来完成。

from multiprocessing.pool import ThreadPool
pool = ThreadPool()

ar = ...
states = np.empty_like(array)

def f(i):
    states[i] = function(ar[i], *args)

pool.map(f, range(len(ar)))

您可以通过以下更改切换到进程

from multiprocessing import Pool
pool = Pool()

0

我无法以逐个元素的方式操作数组。单行的过滤取决于先前的值。 - Max Linke
@kain88:你在for循环中没有正确使用i,而且似乎也没有将其传递给函数。这毫无意义。 - Neil G

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接