Python多进程的替代使用模式,如何避免全局状态的增多?

10

这个(极其简化的)例子在 Python 2.6.6、Debian Squeeze 上运行良好:

from multiprocessing import Pool
import numpy as np

src=None

def process(row):
    return np.sum(src[row])

def main():
    global src
    src=np.ones((100,100))

    pool=Pool(processes=16)
    rows=pool.map(process,range(100))
    print rows

if __name__ == "__main__":
    main()

然而,在被灌输了多年“全局状态很糟糕!!!”的教育后,我的所有直觉都告诉我我真的很想写出更接近以下内容的代码:

from multiprocessing import Pool
import numpy as np

def main():
    src=np.ones((100,100))

    def process(row):
        return np.sum(src[row])

    pool=Pool(processes=16)
    rows=pool.map(process,range(100))
    print rows

if __name__ == "__main__":
    main()

当然,那样做行不通(卡住了,无法pickle某些东西)。

这里的示例很简单,但是当您添加多个“process”函数,并且每个函数都依赖于多个其他输入时...好像它变得有点像30年前用BASIC编写的东西。尝试使用类来聚合具有适当功能的状态似乎是一个明显的解决方案,但在实践中并不那么容易

是否有一些推荐的模式或样式可用于使用multiprocessing.pool,以避免支持我要并行映射的每个函数的全局状态的增殖?

有经验的“多进程专家”如何处理这种情况?

更新:请注意,我实际上对处理更大的数组感兴趣,因此上述的变体 pickle src 每个调用/迭代都不如将其分叉到池的工作进程中。


我并不是一个经验丰富的多进程专家,但我想问一下,为什么你不能简单地使用pool.map(process,product([src],range(100))),并将process函数更改为接受这两个变量作为参数呢?这样做是否也非常低效? - luke14free
@luke14free:是的,这会使每次调用都pickle src数组,而我实际上对比示例代码中更大的数据/数组更感兴趣,所以不是理想的选择。使用进程池,无论在创建池时设置了什么状态,都会被fork到工作进程中,并且可以供它们“免费”读取。这个想法将有助于避免将更多的小的“控制变量”(例如标志)状态放入全局变量中,谢谢。 - timday
1个回答

8
你可以通过传递一个可调用对象来实现共享状态,如下所示:

from multiprocessing import Pool
import numpy as np

class RowProcessor(object):
    def __init__(self, src):
        self.__src = src
    def __call__(self, row):
        return np.sum(self.__src[row])

def main():
    src=np.ones((100,100))
    p = RowProcessor(src)

    pool=Pool(processes=16)
    rows = pool.map(p, range(100))
    print rows

if __name__ == "__main__":
    main()

Yup非常好用,谢谢;再见全局变量。通常我会等待更长时间才接受解决方案,以查看是否还有其他东西出现,但这是完美的。我之前尝试过使用类来解决这个问题,但没有成功;似乎可调用对象使一切不同。 - timday
2
会不会将可调用对象进行序列化,然后回到原点? - abc def foo bar
1
@abc:如果在创建进程池之前创建可调用对象,那么可调用对象就会被分叉到进程池的工作进程中(这比将其序列化和反序列化以及在每个进程中创建对象副本要便宜得多 - CPU TLB技巧 - 并且更有效率)。只有可调用对象的函数参数会被序列化。 - timday

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接