Python的multiprocessing.pool使用时内存占用不断增长

50

程序如下:

#!/usr/bin/python

import multiprocessing

def dummy_func(r):
    pass

def worker():
    pass

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    for index in range(0,100000):
        pool.apply_async(worker, callback=dummy_func)

    # clean up
    pool.close()
    pool.join()

我发现内存使用量(包括VIRT和RES)一直增长,直到close()/join()为止,有没有解决方法可以摆脱这个问题? 我尝试过在2.7中使用maxtasksperchild,但也没有帮助。

我有一个更复杂的程序,调用apply_async()约6M次,在约1.5M点时已经达到了6G+ RES,为了避免所有其他因素,我将程序简化到上述版本。

编辑:

结果证明这个版本效果更好,感谢大家的建议:

#!/usr/bin/python

import multiprocessing

ready_list = []
def dummy_func(index):
    global ready_list
    ready_list.append(index)

def worker(index):
    return index

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    result = {}
    for index in range(0,1000000):
        result[index] = (pool.apply_async(worker, (index,), callback=dummy_func))
        for ready in ready_list:
            result[ready].wait()
            del result[ready]
        ready_list = []

    # clean up
    pool.close()
    pool.join()

按照我所阅读的文档,我认为主进程是单线程的(回调更像是一种基于事件驱动的东西),因此我并没有在那里设置任何锁。

我将v1的索引范围改为1,000,000,和v2相同,并进行了一些测试——对我来说,v2甚至比v1快10%左右(33s对37s),也许v1在执行太多的内部列表维护作业。v2在内存使用方面绝对是赢家,它从未超过300M(VIRT)和50M(RES),而v1曾经达到370M/120M,最好的情况是330M/85M。所有数字都只是进行了3~4次测试,仅供参考。


1
这里只是猜测,但排队一百万个对象会占用空间。也许将它们分批处理会有所帮助。文档并不是最终的,但示例(搜索测试回调)显示即使存在回调,仍在等待apply_async结果。等待可能需要清除结果队列。 - tdelaney
多进程池可能不适合我,因为回调函数实际上并不执行清理工作,是否可以在回调函数中执行清理操作?问题是我不能在apply_async()调用后等待,因为在真实世界中worker()每个请求需要约0.1秒的时间(多个HTTP请求)。 - C.B.
1
猜测:apply_asynch 创建一个 AsynchResult 实例。Pool 可能对这些对象有一些引用,因为它们必须能够在计算完成时返回结果,但是在您的循环中,您只是将它们丢弃了。可能您应该在某个时候调用异步结果的 get()wait(),也许使用 apply_asynchcallback 参数。 - Bakuriu
我认为在编辑版本中,当你覆盖ready_list时存在竞争条件。有一个线程处理来自AsyncResult(https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.apply_async)的结果,并且该线程调用回调函数。它可能更快,只是因为你正在丢弃结果。此外,使用带有小随机延迟的`time.sleep()`模拟工作,并在代码中添加睡眠以捕获竞争条件。 - Javier
1
“maxtasksperchild” 似乎已经解决了在3.7上由“apply_async”引起的内存泄漏问题。 - laido yagamii
查看这个相似的帖子以及我对它的回答 - Akaisteph7
6个回答

30

最近我遇到了内存问题,因为我多次使用multiprocessing函数,这导致它不断生成进程并将它们留在内存中。

这是我现在使用的解决方案:

def myParallelProcess(ahugearray):
    from multiprocessing import Pool
    from contextlib import closing
    with closing(Pool(15)) as p:
        res = p.imap_unordered(simple_matching, ahugearray, 100)
    return res

3
在这个问题上我花了好几天的时间,但是你的翻译解决了我的问题!非常感谢!我在循环内部创建了一个进程池,结果导致产生了过多的进程,每个进程消耗大量内存而且无法退出。我只需要在循环结束时加上“mypool.close()”即可。 - MohamedEzz
21
with Pool 不是自动关闭吗? - matanster
你能解释一下你使用“simple_matching”和“100”的原因吗? - Bret Hess
这些是imap_unordered的占位符参数。第一个是您想要为数组的每个元素运行的函数,第二个是数组,第三个是块的大小(该方法将可迭代对象切成多个块,然后将其作为单独的任务提交给进程池)。这是imap_unordered的官方文档。 - deddu

11

在循环中创建池,然后使用pool.close()在循环结束时关闭池。


5
创建池虽然代价高昂。 - Elena Pascal

9

为避免过度的内存使用,请使用map_async而不是apply_async

对于您的第一个示例,请更改以下两行:

for index in range(0,100000):
    pool.apply_async(worker, callback=dummy_func)

为了

pool.map_async(worker, range(100000), callback=dummy_func)

在您还没来得及在top中看到它的内存使用情况之前,它就会在瞬间完成。将列表更改为更大的列表以查看差异。但请注意,如果没有__len__方法,map_async将首先将您传递给它的可迭代对象转换为列表以计算其长度。如果您有一个包含大量元素的迭代器,可以使用itertools.islice将其分成较小的块进行处理。
在实际程序中,我遇到了内存问题,其中数据量更大,最终发现罪魁祸首是apply_async
顺便说一句,在内存使用方面,您的两个示例没有明显的区别。

2
你能告诉我们为什么 map_async 不会像 apply_async 一样导致内存问题吗? - MrObjectOriented

8
我有一个非常庞大的3D点云数据集正在处理中。我尝试使用多进程模块来加速处理,但是我开始收到内存错误。经过一些研究和测试,我发现我正在比子进程更快地填充要处理的任务队列。我确定通过分块或使用map_async或其他方法可以调整负载,但我不想对周围逻辑进行重大更改。
我想到了一个愚蠢的解决方案,就是间歇性地检查pool._cache长度,如果缓存太大,则等待队列清空。
在我的主循环中,我已经有一个计数器和状态提示器:
# Update status
count += 1
if count%10000 == 0:
    sys.stdout.write('.')
    if len(pool._cache) > 1e6:
        print "waiting for cache to clear..."
        last.wait() # Where last is assigned the latest ApplyResult

每10k次向池中插入一个任务时,我会检查是否有超过100万个操作排队等待(大约在主进程中使用1G内存)。当队列已满时,我只需等待最后插入的任务完成。

现在,我的程序可以运行数小时而不会耗尽内存。主进程偶尔暂停,而工作线程继续处理数据。

顺便说一下,_cache成员在multiprocessing模块池示例中有文档说明:

#
# Check there are no outstanding tasks
#

assert not pool._cache, 'cache = %r' % pool._cache

7

您可以限制每个子进程的任务数

multiprocessing.Pool(maxtasksperchild=1)

maxtasksperchild是工作进程完成任务后退出并被新的工作进程替换以释放未使用资源的任务数。默认的maxtasksperchild为None,这意味着工作进程将与池一样长时间存在。链接


这是一个很好的答案,1 确保我们在生成新的工作进程方面非常积极。 - Arka Mukherjee

2

我认为这与我发布的问题类似,但我不确定您是否有相同的延迟。我的问题是,我从多进程池中生成结果的速度比我消耗它们的速度快,因此它们在内存中积累。为了避免这种情况,我使用了信号量来限制进入池中的输入,以便它们没有超前于我正在消耗的输出。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接