使用多进程处理for循环?

129

我有一个数组(名为data_inputs),其中包含数百个天文图像文件的名称。然后对这些图像进行操作。我的代码有效,每处理一张图片需要几秒钟。但是,它只能一次处理一张图片,因为我正在通过for循环运行该数组:

for name in data_inputs:
    sci=fits.open(name+'.fits')
    #image is manipulated

没有必要在其他图像之前修改图像,因此是否可能利用我的计算机上的所有4个核心,使每个核心通过for循环处理不同的图像?

我已经阅读了关于multiprocessing模块的资料,但不确定如何在我的情况下实现它。 我渴望让multiprocessing工作,因为最终我将不得不运行这个程序在10,000多张图像上。

4个回答

145
你可以简单地使用multiprocessing.Pool:
from multiprocessing import Pool

def process_image(name):
    sci=fits.open('{}.fits'.format(name))
    <process>

if __name__ == '__main__':
    pool = Pool()                         # Create a multiprocessing Pool
    pool.map(process_image, data_inputs)  # process data_inputs iterable with pool

25
使用以下代码可能更好: pool = Pool(os.cpu_count())这是一种更通用的使用多进程的方式。 - Lior Magen
2
注意:os.cpu_count()是在Python 3.4中添加的。对于Python 2.x,请使用multiprocessing.cpu_count() - dwj
38
"Pool()"与"Pool(os.cpu_count())"是一样的。 - Tim
22
进一步解释 @Tim 的评论 - 在调用Pool()时如果没有给processes赋值,不管你使用的是Python 3还是Python 2版本,它都等同于Pool(processes=cpu_count()) - 所以在任何一个版本中最好的做法都是使用Pool()。 https://docs.python.org/2/library/multiprocessing.html - Kyle Pittman
12
如果我没记错的话,使用Pool(os.cpu_count())会导致操作系统在处理结束前被冻结,因为你没有给操作系统留下任何可用的核心。对于许多用户而言,Pool(os.cpu_count() - 1)可能是更好的选择。 - shayelk
显示剩余4条评论

35

你可以使用multiprocessing.Pool

from multiprocessing import Pool
class Engine(object):
    def __init__(self, parameters):
        self.parameters = parameters
    def __call__(self, filename):
        sci = fits.open(filename + '.fits')
        manipulated = manipulate_image(sci, self.parameters)
        return manipulated

try:
    pool = Pool(8) # on 8 processors
    engine = Engine(my_parameters)
    data_outputs = pool.map(engine, data_inputs)
finally: # To make sure processes are closed in the end, even if errors happen
    pool.close()
    pool.join()

3
我无法理解这里的"data_inputs"是什么意思。你还没有定义它。我应该给它什么值? - Abhishek dot py
2
它实际上源于alko的回答,我引用了他的评论(请参见代码块):“使用池处理数据输入可迭代对象”。因此,data_inputs是一个可迭代对象(就像标准的map函数中一样)。 - ponadto
Python的文档(https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool)仅显示可以将函数传递给 pool.map(func, iterable[, chunksize])。当传递一个对象时,这个对象会被所有进程共享吗?因此,我能让所有进程写入对象中相同的列表self.list_吗? - Philipp

10
另外,或者说作为另一种选择,

with Pool() as pool: 
    pool.map(fits.open, [name + '.fits' for name in datainput])

TypeError: 'Pool' object is not callable - chris
4
抱歉,我的错误,应该是"pool.map"而不仅仅是"pool"。我已经进行了修正。 - Spas

4

如果您只使用for循环遍历迭代器,我建议使用带有chunksizeimap_unordered。 它会在计算出每个循环的结果时立即返回结果。map等待所有结果都被计算完毕,因此具有阻塞性。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接