程序如下:
#!/usr/bin/python
import multiprocessing
def dummy_func(r):
pass
def worker():
pass
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=16)
for index in range(0,100000):
pool.apply_async(worker, callback=dummy_func)
# clean up
pool.close()
pool.join()
我发现内存使用量(包括VIRT和RES)一直增长,直到close()/join()为止,有没有解决方法可以摆脱这个问题? 我尝试过在2.7中使用maxtasksperchild,但也没有帮助。
我有一个更复杂的程序,调用apply_async()约6M次,在约1.5M点时已经达到了6G+ RES,为了避免所有其他因素,我将程序简化到上述版本。
编辑:
结果证明这个版本效果更好,感谢大家的建议:
#!/usr/bin/python
import multiprocessing
ready_list = []
def dummy_func(index):
global ready_list
ready_list.append(index)
def worker(index):
return index
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=16)
result = {}
for index in range(0,1000000):
result[index] = (pool.apply_async(worker, (index,), callback=dummy_func))
for ready in ready_list:
result[ready].wait()
del result[ready]
ready_list = []
# clean up
pool.close()
pool.join()
按照我所阅读的文档,我认为主进程是单线程的(回调更像是一种基于事件驱动的东西),因此我并没有在那里设置任何锁。
我将v1的索引范围改为1,000,000,和v2相同,并进行了一些测试——对我来说,v2甚至比v1快10%左右(33s对37s),也许v1在执行太多的内部列表维护作业。v2在内存使用方面绝对是赢家,它从未超过300M(VIRT)和50M(RES),而v1曾经达到370M/120M,最好的情况是330M/85M。所有数字都只是进行了3~4次测试,仅供参考。
apply_asynch
创建一个AsynchResult
实例。Pool
可能对这些对象有一些引用,因为它们必须能够在计算完成时返回结果,但是在您的循环中,您只是将它们丢弃了。可能您应该在某个时候调用异步结果的get()
或wait()
,也许使用apply_asynch
的callback
参数。 - Bakuriuready_list
时存在竞争条件。有一个线程处理来自AsyncResult
(https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.apply_async)的结果,并且该线程调用回调函数。它可能更快,只是因为你正在丢弃结果。此外,使用带有小随机延迟的`time.sleep()`模拟工作,并在代码中添加睡眠以捕获竞争条件。 - Javier