在numpy块上并行运行一个重复循环

3
我需要遍历一个巨大的numpy数组,以构建三个列表,根据耗时的C库调用的结果而定,该库接受标量值并且无法进行矢量化(或者至少我不知道如何做到)。这个循环可能需要几个小时甚至几天时间,并且我可以看到性能会随着时间推移而降低(我记录了进度并且可以看到末尾时速度明显变慢),也许是由于列表大小的增长(??)。
代码看起来像这样(我省略了与打印进度和一些微观优化相关的代码):
import numpy as np
import swig_c_lib


def build_indexes(large_numpy_array_1, large_numpy_array_2):
    xs = []
    ys = []
    idxs = []

    for (x, y), value in np.ndenumerate(large_numpy_array_1):

        if not (value <= -1.0e+10):
            try:
                index = swig_c_lib.slow_computation(np.asscalar(large_numpy_array_2[x, y]), np.asscalar(large_numpy_array_1[x, y]))
            except swig_lib.InternalError:
                pass
            else:
                xs.append(x)
                ys.append(y)
                idxs.append(index)
    return np.asarray(xs), np.asarray(ys), np.asarray(idxs)

也许,一种解决方法是将大型输入numpy数组分成4个子数组,并使用多进程(但我不确定如何合并结果)。有人可以在这里提供帮助吗?

1
如果您知道 large_numpy_array_1 的大小,您可以预设 xs 和 ys 的大小:xs = [None]*size_of_large_numpy_array_1。然后使用索引赋值:xs[i] = x。这将显著提高大型数组的速度。 - screenpaver
1个回答

7
这是一个可以使用 dask 模块帮助解决的问题。
让我们先创建两个数组 a1a2。它们可以有任意形状,但在这个例子中,我们将它们设置为 nn 列,其中 n=30。我们将这些数组展平并堆叠在一起,形成一个形状为 (2,900) 的单个大数组。沿着 axis=1 维度的每对元素都是在 a1a2 上相同位置的一对元素:
In[1]:
import numpy as np
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)
a = np.stack((a1.flat, a2.flat))
a.shape

Out[1]:
(2, 900)

然后我们将数组分成块。我们选择250个块:
In[2]:
chunks = np.array_split(a, 250, axis=1)
len(chunks)

Out[2]:
250


In[3]:
chunks[0]

Out[3]:
array([[ 0.54631022,  0.8428954 ,  0.11835531,  0.59720379],
   [ 0.51184696,  0.64365038,  0.74471553,  0.67035977]])

现在我们将定义一个名为 slow_function 的函数,它将扮演问题中描述的缓慢计算的角色。我们还定义了一种使用 numpy 将该函数应用于其中一个块的方法。
In[4]:
def slow_function(pair):
    return np.asscalar(pair[0]) + np.asscalar(pair[1])

def apply_on_chunk(chunk):
    return np.apply_along_axis(slow_function, 0, chunk)

apply_on_chunk(chunks[0])

Out[4]:
array([ 1.05815717,  1.48654578,  0.86307085,  1.26756356])

在上面,请注意apply_on_chunk无论块中的axis=1大小如何都可以工作。换句话说,我们可以直接调用apply_on_chunk(a)来计算整个初始数组的结果。


使用dask.bag并行化

现在我们展示如何使用dask.bag对象的map方法沿着块并行化计算:

In[5]:
import dask.bag as db
mybag = db.from_sequence(chunks)

In[6]:
%time myresult = mybag.map(apply_on_chunk)

Out[6]:
CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 1.62 ms

此时我们还没有计算任何东西。我们已经向dask描述了我们想要计算结果的方式。这一步骤相对较快,大约需要1.6毫秒。
接下来触发实际计算,我们调用myresult上的compute方法。
In[7]:
%time myresult = myresult.compute()


Out[7]:
CPU times: user 256 ms, sys: 24 ms, total: 280 ms
Wall time: 362 ms

以上代码需要运行1/3秒左右的时间。我们得到一个数组列表,对应于在中每个元素上调用的结果。我们检查其中的前五个:
In[8]:
myresult[:5]

Out[8]:
[array([ 1.05815717,  1.48654578,  0.86307085,  1.26756356]),
 array([ 1.48913909,  1.25028145,  1.36707112,  1.04826167]),
 array([ 0.90069768,  1.24921559,  1.23146726,  0.84963409]),
 array([ 0.72292347,  0.87069598,  1.35893143,  1.02451637]),
 array([ 1.16422966,  1.35559156,  0.9071381 ,  1.17786002])]

如果我们真的想要最终形式的结果,我们必须调用np.concatenate来将所有块的结果合并在一起。我们在下面执行此操作,并显示计算性能:
In[9]:
%time myresult = np.concatenate(\
    db.from_sequence(\
        np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
    ).map(apply_on_chunk).compute())

Out[9]:
CPU times: user 232 ms, sys: 40 ms, total: 272 ms
Wall time: 342 ms

完整的计算将结果存储在单个数组中,需要略微超过1/3秒的时间运行。
假设我们直接对整个数组进行计算,而不使用任何并行化技术,那会怎样呢?
In[10]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))

Out[10]:
CPU times: user 12 ms, sys: 0 ns, total: 12 ms
Wall time: 12.9 ms

直接计算速度要快得多。但这是因为此时的“slow_function”并不真的很慢。它只是两个元素的求和,所以根本不需要太多时间。我们在“dask.bag”计算中看到的迟缓是并行化带来的开销。
让我们再试一次,但这次使用一个真正缓慢的函数,每个调用大约需要20毫秒:
In[11]:
n = 30
a1 = np.random.rand(n, n)
a2 = np.random.rand(n, n)

import time

def slow_function(pair):
    time.sleep(0.02)
    return np.asscalar(pair[0]) + np.asscalar(pair[1])

def apply_on_chunk(chunk):
    return np.apply_along_axis(slow_function, 0, chunk)

让我们比较一下 dask 能做什么,与直接在整个数组上运行计算的区别:
In[12]:
%time myresult = np.concatenate(\
    db.from_sequence(\
        np.array_split(np.stack((a1.flat, a2.flat)), 250, axis=1)\
    ).map(apply_on_chunk).compute())

Out[12]:
CPU times: user 236 ms, sys: 20 ms, total: 256 ms
Wall time: 4.75 s


In[13]:
%time myresult_ = np.apply_along_axis(slow_function, 0, np.stack((a1.flat, a2.flat)))


Out[13]:
CPU times: user 72 ms, sys: 16 ms, total: 88 ms
Wall time: 18.2 s

正如可以看到的那样,dask 利用多进程来加速计算。我们获得了大约4倍的加速效果。
为了完整起见,我们展示了使用 dask 和直接计算得出的结果是一致的:
In[14]:
np.testing.assert_array_equal(myresult, myresult_)
print("success")

Out[14]:
success

请注意,问题中的函数返回一个元组。
np.asarray(xs), np.asarray(ys), np.asarray(idxs)

我们所描述的只涉及计算 np.asarray(idxs)。如果知道原始数组 a1a2 的形状,可以很容易地获得返回元组中的前两个项。

感谢您提供的出色答案。它应该被包含在dask文档中作为示例。我已经实现了它,它可以加速至少3倍的处理速度。不过,我有一个小问题,如何以高效的方式从块中重构结果数组。目前,我使用以下代码,假设块长度为250:xs = np.concatenate([result[i][0] for i in xrange(250)]) ys = np.concatenate([result[i][1] for i in xrange(250)]) idxs = np.concatenate([result[i][2] for i in xrange(250)])是否有一种numpythonic的方法来完成这个任务? - BangTheBank
字面上不起作用,不确定为什么会获得赞同。 - v0rtex20k

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接