在Python多进程池中共享NumPy数组

7
我正在处理一些代码,对大量问题(数万到数十万个数值积分)进行了一些相当重的数值计算。幸运的是,这些积分可以被轻松地并行处理,因此可以使用Pool.map()将工作分配给多个核心。
现在,我有一个程序,其基本工作流程如下:
#!/usr/bin/env python
from multiprocessing import Pool
from scipy import *
from my_parser import parse_numpy_array
from my_project import heavy_computation

#X is a global multidimensional numpy array
X = parse_numpy_array("input.dat")
param_1 = 0.0168
param_2 = 1.505

def do_work(arg):
  return heavy_computation(X, param_1, param_2, arg)

if __name__=='__main__':
  pool = Pool()
  arglist = linspace(0.0,1.0,100)
  results = Pool.map(do_work,arglist)
  #save results in a .npy file for analysis
  save("Results", [X,results])

自 X, param_1 和 param_2 对于池中的每个进程都是硬编码并以完全相同的方式初始化,因此这一切都可以正常工作。现在我已经让代码运行起来了,我想让文件名、param_1 和 param_2 能够在运行时由用户输入,而不是被硬编码。
应该注意的一件事是,在进行工作时,X、param_1 和 param_2 没有被修改。因为我不会修改它们,所以我可以在程序开始时像这样做:
import sys
X = parse_numpy_array(sys.argv[1])
param_1 = float(sys.argv[2])
param_2 = float(sys.argv[3])

“那样做就可以了,但由于大多数使用这个代码的用户都是在 Windows 计算机上运行代码,我不想采用命令行参数的方式。”
“我真正想要做的是像这样:”
X, param_1, param_2 = None, None, None

def init(x,p1, p2)
  X = x
  param_1 = p1
  param_2 = p2

if __name__=='__main__':
  filename = raw_input("Filename> ")
  param_1 = float(raw_input("Parameter 1: "))
  param_2 = float(raw_input("Parameter 2: "))
  X = parse_numpy_array(filename)
  pool = Pool(initializer = init, initargs = (X, param_1, param_2,))
  arglist = linspace(0.0,1.0,100)
  results = Pool.map(do_work,arglist)
  #save results in a .npy file for analysis
  save("Results", [X,results])

当然,这种方法失败了,当池的map调用发生时X/param_1/param_2全部都是空的。我对多进程编程还不是很熟悉,所以我不确定为什么初始化调用会失败。有没有办法做到我想做的事情呢?或者完全换一种方式去解决这个问题?我也看过使用共享数据的方法,但根据文档的理解,那只适用于包括numpy数组在内的ctypes。非常感谢任何关于此的帮助。

根据这里的内容,可以让Numpy与ctypes良好地协作。 - Ken
你应该看看Stack Overflow的内容,而不是文档 :) - senderle
@senderle 我不确定你是否应该鼓励人们不阅读文档。我同意在 Stack Overflow 上搜索通常更有帮助。 - Ken
@Ken,是的,我在开玩笑。不过你说得对。 - senderle
2个回答

5
我遇到了类似的问题。如果你只是想看我的解决方案,请跳过一些行 :) 我必须做到:
- 在不同部分操作的线程之间共享一个numpy.array,并且... - 传递Pool.map一个具有多个参数的函数。
我注意到:
- numpy.array的数据被正确读取,但... - 对numpy.array的更改并未保持。 - Pool.map在处理lambda函数时存在问题,或者对我来说是这样(如果您不清楚这一点,请忽略它)。
我的解决方案是:
- 使目标函数的唯一参数成为列表。 - 使目标函数返回修改后的数据,而不是直接尝试写入numpy.array。
我理解你的do_work函数已经返回计算出的数据,因此你只需要修改to_work函数以接受一个包含X,param_1,param_2和arg的列表作为参数,并在将其传递给Pool.map之前以这种格式打包目标函数的输入。
以下是示例实现:
def do_work2(args):
    X,param_1,param_2,arg = args
    return heavy_computation(X, param_1, param_2, arg)

现在你需要在调用do_work函数之前对输入进行打包。你的主函数应该变成:
if __name__=='__main__':
   filename = raw_input("Filename> ")
   param_1 = float(raw_input("Parameter 1: "))
   param_2 = float(raw_input("Parameter 2: "))
   X = parse_numpy_array(filename)
   # now you pack the input arguments
   arglist = [[X,param1,param2,n] for n in linspace(0.0,1.0,100)]
   # consider that you're not making 100 copies of X here. You're just passing a reference to it
   results = Pool.map(do_work2,arglist)
   #save results in a .npy file for analysis
   save("Results", [X,results])

1
multiprocessing导入的所有内容(而不是threading)都使用pickle传递参数给函数。由于无法将lambda函数进行pickle,所以Pool.map不能将其用作传递给函数的参数。这就是为什么Pool.map在处理lambda函数时会出现问题。 - xolodec

-2
为使你的最后一个想法奏效,我认为你可以在 if 语句内部修改之前使用 global 关键字将 X、param_1 和 param_2 变量简单地设置为全局变量。因此请添加以下内容:
global X
global param_1
global param_2

if __name__ == '__main__'之后直接添加。


1
我认为这没有任何作用。if语句在全局命名空间中,因此Xparam_1param_2已经是全局的了。无论如何,全局性不是问题所在;这是一个特定于multiprocessing的问题。 - senderle
很遗憾那不是问题所在。我很少使用multiprocessing,因为我的问题几乎从来不是尴尬的并行问题。然而,if语句内部的变量根据我的直觉和实验,并不在全局命名空间中。 - Ken
我不确定你做了什么实验,但如果你运行这个脚本:if __name__ == '__main__': a = 5; print globals()['a'],Python会打印出 '5'。所以我非常肯定 a 是在全局命名空间中的。 - senderle

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接