Python 多进程中的 Pool.map 调用了 acquire 方法?

12
我有一个形状为640x480的numpy.array图像,每个图像都包含630个像素。因此,总数组大小为630x480x640。 我想生成一个平均图像,并计算所有630个图像中每个像素的标准偏差。
这可以通过以下方式轻松完成:
avg_image = numpy.mean(img_array, axis=0)
std_image = numpy.std(img_array, axis=0)

然而,由于我要对50个这样的数组进行操作,并且有一台8核/16线程的工作站,所以我认为可以使用multiprocessing.Pool并行处理。

因此,我做了以下操作:

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

然而,我只看到了一点点的加速。通过在chunk_avg_map中添加打印语句,我能够确定每次只有一个或两个进程被启动,而不是16个(正如我所期望的那样)。

然后我在iPython中使用cProfile运行了我的代码:

%prun current_image_anal.main()

结果显示,迄今为止花费最多时间的是对acquire的调用:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1527  309.755    0.203  309.755    0.203 {built-in method acquire}

我理解这似乎与锁有关,但我不明白为什么我的代码会这样做。有人有什么想法吗?

[编辑] 根据要求,这里是一个可运行的脚本,用来演示问题。您可以通过任何手段对其进行分析,但是当我尝试时,我发现大部分时间都花在了获取调用上,而不是像我预期的那样花在mean或std上。

#!/usr/bin/python
import numpy
import multiprocessing

def main():
    fake_images = numpy.random.randint(0,2**14,(630,480,640))
    chunk_avg(fake_images)

def chunk_avg_map(chunk):
    #do the processing
    sig_avg = numpy.mean(chunk, axis=0)
    sig_std = numpy.std(chunk, axis=0)
    return([sig_avg, sig_std])

def chunk_avg(img_data):

    #take each row of the image
    chunks = [img_data[:,i,:] for i in range(len(img_data[0]))]

    pool = multiprocessing.Pool()
    result = pool.map(chunk_avg_map, chunks)
    pool.close()
    pool.join()
    return result

if __name__ == "__main__":
    main()

1
map()调用中指定一个chunksize是否会改变任何内容? - llasram
chunks = [img_data[:,i,:] for i ... 替换为 chunks = (img_data[:,i,:].copy() for i ... 会有所帮助吗? - Joe Kington
不,那并没有帮助。不过这是个好主意,我想我会把它加入到我的代码中 :-) - Matthew Lawson
只是好奇:您在chunk_avg_map中放了哪些打印语句?我尝试过 print(multiprocessing.current_process().name),但这并不能准确证明有多少个进程同时运行。 - unutbu
我执行了 x = numpy.random.randint(0,10000) ,然后在 chunk_avg_map 的开头和结尾打印了 x。根据这个,你可以看到每个线程的起点和终点,因为任何两个线程获得相同的随机整数的概率非常小。 - Matthew Lawson
显示剩余7条评论
1个回答

7
我认为问题在于处理每个块所需的CPU时间相对于将输入和输出复制到工作进程所需的时间较少。我修改了您的示例代码,将输出分成16个均匀的块,并打印出chunk_avg_map()运行开始和结束时的CPU时间差异(time.clock())。在我的系统上,每次单独运行需要略低于一秒钟的CPU时间,但进程组的总CPU时间使用量(系统+用户时间)超过38秒。每个块似乎有0.75秒的复制开销,使得您的程序执行计算的速度只比multiprocessing传递数据的速度稍快,导致仅有两个工作进程被同时利用。
如果我修改代码,使“输入数据”只是xrange(16),并在chunk_avg_map()内部构建随机数组,则系统+用户时间下降到约19秒,并且所有16个工作进程同时执行。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接