如何使用Python的多进程池映射(Pool.map)在for循环中填充numpy数组

14
我希望填充一个2D-numpy数组,使用for循环并通过使用多进程来加速计算。
import numpy
from multiprocessing import Pool


array_2D = numpy.zeros((20,10))
pool = Pool(processes = 4)

def fill_array(start_val):
    return range(start_val,start_val+10)

list_start_vals = range(40,60)
for line in xrange(20):
    array_2D[line,:] = pool.map(fill_array,list_start_vals)
pool.close()

print array_2D

执行它的效果是Python运行4个子进程并占用4个CPU核心,但是执行不会完成,数组也没有打印出来。如果我尝试将该数组写入磁盘,则什么也不会发生。

有人能告诉我为什么吗?


你还记得你是怎么运行这段代码的吗?是在命令行、Jupyter还是脚本中? - pylang
3个回答

5
以下内容可行。首先,为了避免奇怪的副作用,将代码主要部分保护在main块中是一个好主意。 pool.map() 的结果是一个包含每个值的评估的列表,该值在迭代器 list_start_vals 中,因此您不必先创建 array_2D
import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return list(range(start_val, start_val+10))

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.array(pool.map(fill_array, list_start_vals))
    pool.close() # ATTENTION HERE
    print array_2D

也许你在使用 pool.close() 时会遇到问题,根据 @hpaulj 的评论,如果出现问题,你只需删除此行即可...

使用更大的数组时,我会收到错误Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored。但是,apply_async不会出现此警告。 - hpaulj
没有 pool.close() 命令,我就会得到这个 Error - hpaulj
@hpaulj 感谢您的反馈... 我尝试生成一个 10000 X 10000 的数组,没有问题,将60改为10040,10改为10000... - Saullo G. P. Castro
也许这是机器尺寸和速度的问题。我的相对较旧。 - hpaulj
经过进一步测试,如果映射速度太慢,则pool.join()更为重要。 - hpaulj

1
如果您仍然想使用数组填充,可以使用pool.apply_async代替pool.map。根据Saullo的回答进行操作:
import numpy as np
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val, start_val+10)

if __name__=='__main__':
    pool = Pool(processes=4)
    list_start_vals = range(40, 60)
    array_2D = np.zeros((20,10))
    for line, val in enumerate(list_start_vals):
        result = pool.apply_async(fill_array, [val])
        array_2D[line,:] = result.get()
    pool.close()
    print array_2D

这个运行比 map 慢一点。但是它不会像我测试的 map 版本一样产生运行时错误:Exception RuntimeError: RuntimeError('cannot join current thread',) in <Finalize object, dead> ignored


0
问题是由于在for循环中运行pool.map,map()方法的结果在功能上等同于内置的map(),只是单个任务并行运行。 因此,在您的情况下,pool.map(fill_array,list_start_vals)将被调用20次,并且在for循环的每次迭代中开始并行运行。以下代码应该可以工作。

代码:

#!/usr/bin/python

import numpy
from multiprocessing import Pool

def fill_array(start_val):
    return range(start_val,start_val+10)

if __name__ == "__main__":
    array_2D = numpy.zeros((20,10))
    pool = Pool(processes = 4)    
    list_start_vals = range(40,60)

    # running the pool.map in a for loop is wrong
    #for line in xrange(20):
    #    array_2D[line,:] = pool.map(fill_array,list_start_vals)

    # get the result of pool.map (list of values returned by fill_array)
    # in a pool_result list 
    pool_result = pool.map(fill_array,list_start_vals)

    # the pool is processing its inputs in parallel, close() and join() 
    #can be used to synchronize the main process 
    #with the task processes to ensure proper cleanup.
    pool.close()
    pool.join()

    # Now assign the pool_result to your numpy
    for line,result in enumerate(pool_result):
        array_2D[line,:] = result

    print array_2D

感谢您的回复。不幸的是,效果是一样的。Python启动子进程并占用PC,但没有任何反应。我在Windows 7机器上运行代码(双核CPU带超线程=>实际上是四核),使用Python 2.7.5 32位,并使用SpyderLib作为编程接口。 - MoTSCHIGGE
@MoTSCHIGGE,我在Windows环境下运行了我发布的代码,看起来它是正常工作的。我认为你在没有加上if "main"==__name__:的情况下运行代码,如果是这种情况,代码将会在Windows中无限运行,请参考Stack Overflow链接,了解在Windows中if条件语句的重要性https://dev59.com/ZGIj5IYBdhLWcg3weFCq - Ram
我刚刚尝试运行了上面的示例代码,包括 "if name == "main": ",但什么也没有发生。我不知道出了什么问题... - MoTSCHIGGE

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接