Python使用multiprocessing.pool时无法分配内存

14

我的代码(遗传优化算法的一部分)并行运行几个进程,等待它们全部完成,然后读取输出,并使用不同的输入重复此过程。当我进行60次重复测试时,一切都很正常。既然如此,我决定使用更现实的重复次数200。但是我收到了以下错误:

File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
 self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
 self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 302, in _handle_workers
 pool._maintain_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 206, in _maintain_pool
 self._repopulate_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 199, in _repopulate_pool
 w.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 130, in start
 self._popen = Popen(self)
File "/usr/lib/python2.7/multiprocessing/forking.py", line 120, in __init__
 self.pid = os.fork()
OSError: [Errno 12] Cannot allocate memory

这里是使用池的代码片段:

def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc) 
results=[]
for arg1 in inputs[1]:
    for arg2 in inputs[2]:
        for arg3 in inputs[3]:
            results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
casenum=0
datadict=dict()
for p in results:
    #get results of simulation once it has finished
    datadict[casenum]=p.get() 
    casenum+=1
return datadict

RunOne函数会创建一个我所创造的类的对象,使用计算密集型的Python包来解决一个化学问题,大约需要30秒钟时间,并返回带有化学求解器输出的对象。我的代码以串行方式调用RunMany,并在其中并行调用RunOne。在我的测试中,我使用了10个处理器(计算机有16个)和20个对RunOne的调用池来调用RunOne。换句话说,len(arg1)* len(arg2)* len(arg3)= 20。当我的代码调用RunMany 60次时,一切都正常,但当我调用它200次时,我就没有内存了。
这是否意味着某个过程没有正确清理自己?我有内存泄漏吗?如何确定是否存在内存泄漏,如何找到泄漏的原因?我的200次循环中唯一增长的项是一个数字列表,它的大小从0增长到200。我有一个自定义类的对象字典,但其长度限制为50个条目 - 每次循环执行时,它会删除字典中的一个项目并用另一个项目替换它。
编辑:以下是调用RunMany代码的片段
for run in range(nruns):
    #create inputs object for RunMany using genetic methods. 
    #Either use starting "population" or create "child" inputs from successful previous runs
    datadict = RunMany(inputs)

    sumsquare=0
    for i in range(len(datadictsenk)): #input condition
        sumsquare+=Compare(datadict[i],Target[i]) #compare result to target

    with open(os.path.join(mainpath,'Outputs','output.txt'),'a') as f:
        f.write('\t'.join([str(x) for x in [inputs.name, sumsquare]])+'\n')

    Objective.append(sumsquare) #add sum of squares to list, to be plotted outside of loop
    population[inputs]=sumsquare #add/update the model in the "population", using the inputs object as a key, and it's objective function as the value
    if len(population)>initialpopulation:
        population = PopulationReduction(population) #reduce the "population" by "killing" unfit "genes"
    avgtime=(datetime.datetime.now()-starttime2)//(run+1)
    remaining=(nruns-run-1)*avgtime
    print(' Finished '+str(run+1)+' / ' +str(nruns)+'. Elapsed: '+str(datetime.datetime.now().replace(microsecond=0)-starttime)+' Remaining: '+str(remaining)+' Finish at '+str((datetime.datetime.now()+remaining).replace(microsecond=0))+'~~~', end="\r")

1
现在,“results”将会非常快地增长,当这种情况发生时 - 您将会因为从未关闭打开的进程池而耗尽内存。 - Tymoteusz Paul
1
它应该按照那种方式工作,但有时Python在清理自己之后会出现问题。请在此处查看类似的问题https://dev59.com/QIHba4cB1Zd3GeqPOTcV#24564983 - Tymoteusz Paul
1
小问题,但我认为你应该在这里使用xrange而不是range,因为你只需要索引而不是列表。 - shuttle87
1
@Jeff,我不确定你在哪里添加了它,但似乎你正在尝试从其中加入一个线程,这是无效的操作。 - Tymoteusz Paul
1
@Puciek:你是正确的。现在我已经在RunMany中正确地添加了pool.close()pool.join()(在两个for循环之间),一切似乎都很好。谢谢! - Jeff
显示剩余5条评论
1个回答

17

正如在我的问题评论中所示,答案来自Puciek。

解决方案是在完成后关闭进程池。我认为它会自动关闭,因为results变量是RunMany的局部变量,并且会在RunMany完成后被删除。但是,Python并不总是按照预期工作。

修复后的代码如下:

def RunMany(inputs):
from multiprocessing import cpu_count, Pool
proc=inputs[0]
pool=Pool(processes = proc) 
results=[]
for arg1 in inputs[1]:
    for arg2 in inputs[2]:
        for arg3 in inputs[3]:
            results.append(pool.apply_async(RunOne, args=(arg1, arg2, arg3)))
#new section
pool.close()
pool.join()    
#end new section
casenum=0
datadict=dict()
for p in results:
    #get results of simulation once it has finished
    datadict[casenum]=p.get() 
    casenum+=1
return datadict

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接