如何使用多进程对2D numpy数组应用函数

4
假设我有以下函数:
def f(x,y):
    return x*y

如何使用多进程模块将函数应用于NxM 2D numpy数组中的每个元素?使用串行迭代,代码可能如下所示:

import numpy as np
N = 10
M = 12
results = np.zeros(shape=(N,M))
for x in range(N):
    for y in range(M):
        results[x,y] = f(x,y)

2
我假设这只是一个玩具模型,你需要做的事情更加复杂,但是numpy有高效的函数可以执行你在代码中编写的操作。 - Julien Spronck
我目前正在查看文档,但还没有找到如何将一个函数应用于数组中的每个元素。有什么指导吗? - PTTHomps
@JulienSpronck 看起来你忘记发布你提到的答案了。 - aensm
1
Julien是对的 - 当您尝试优化numpy代码时,多进程是最后一个可以使用的工具。Python中的多进程速度较慢且繁琐,通常使用[广播](http://docs.scipy.org/doc/numpy/user/basics.broadcasting.html)和BLAS优化的线性代数计算可以获得更大的效率提升。除此之外,还有[Cython](http://cython.org/),[numba](http://numba.pydata.org/)和[numexpr](https://code.google.com/p/numexpr/)。由于您还没有展示出要优化的代码,很难给出更具体的建议。 - ali_m
我的真实世界应用程序在大量数据上运行机器学习例程。运行它针对2D numpy数组的目的是允许我识别SVM中RBF内核的gamma和C的理想值。除此之外,这是一个有用的示例,可以帮助更好地理解多处理模块。 - PTTHomps
显示剩余2条评论
2个回答

8
以下是使用 multiprocessing 并行化您的示例函数的方法。我还包括了一个几乎完全相同的纯 Python 函数,它使用非并行的 for 循环,以及一个可以实现相同结果的 numpy 一行代码:
import numpy as np
from multiprocessing import Pool


def f(x,y):
    return x * y

# this helper function is needed because map() can only be used for functions
# that take a single argument (see https://dev59.com/dG035IYBdhLWcg3wZvJg)
def splat_f(args):
    return f(*args)

# a pool of 8 worker processes
pool = Pool(8)

def parallel(M, N):
    results = pool.map(splat_f, ((i, j) for i in range(M) for j in range(N)))
    return np.array(results).reshape(M, N)

def nonparallel(M, N):
    out = np.zeros((M, N), np.int)
    for i in range(M):
        for j in range(N):
            out[i, j] = f(i, j)
    return out

def broadcast(M, N):
    return np.prod(np.ogrid[:M, :N])

现在让我们来看一下性能:
%timeit parallel(1000, 1000)
# 1 loops, best of 3: 1.67 s per loop

%timeit nonparallel(1000, 1000)
# 1 loops, best of 3: 395 ms per loop

%timeit broadcast(1000, 1000)
# 100 loops, best of 3: 2 ms per loop

非并行的纯Python版本比并行版本快约4倍,而使用numpy数组广播的版本绝对压倒性优于其他两个版本。

问题在于启动和停止Python子进程会带来相当多的开销,并且您的测试函数如此微不足道,以至于每个工作线程只花费很少一部分的寿命来执行有用的工作。只有在每个线程被杀死之前有大量的工作要做时,多处理才有意义。例如,您可以给每个工作线程一个更大的输出数组块来计算(尝试调整pool.map()chunksize=参数),但是基于这样微不足道的示例,我怀疑您不会看到很大的改进。

我不知道您实际的代码是什么样子的——也许您的函数足够大且昂贵,值得使用多处理。然而,我敢打赌,有更好的方法来提高其性能。


我相信还有其他方法可以提高性能,但目前一次运行大约需要10分钟,并且仅使用单个逻辑核心。它会将核心的使用率最大化,但仅使用一个核心。感谢您的回答 :) - PTTHomps
相信我,我知道想要看到所有内核以最大速度运行的诱惑,但请相信我——这不是优化的正确方式!始终从对代码进行分析(例如使用 line_profiler)开始,并确定瓶颈在哪里,然后着重解决这些问题。尽可能使用 BLAS 和广播,对于任何无法通过广播消除的内部 for 循环,请使用 Cython 或 numba。 - ali_m
一个小提示:可以使用 pool.starmap 代替 splat_fpool.map,因为它在内部执行的操作与 splat_f 相同。 - sophros
@sophros 是的,只要您使用Python 3.3或更新版本(请参见我上面链接的https://dev59.com/dG035IYBdhLWcg3wZvJg),就可以。 - ali_m

0

不确定在您的情况下是否需要多进程。在上面的简单示例中,您可以执行以下操作

X, Y = numpy.meshgrid(numpy.arange(10), numpy.arange(12))
result = X*Y

多进程是我应用程序所需的。这个例子被大大简化了,但确实代表了基本问题。 - PTTHomps
那正是我所想的...那么恐怕我不知道。 - Julien Spronck

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接