Python多进程。长时间执行后,进程池(Pool)被卡住了。

4

我正在开发一款分析大文件的工具。为了更快地进行分析,我引入了多进程技术,并且一切似乎都运行得很好。为此,我使用multiprocessing.pool创建了N个线程,并且它们处理之前我创建的不同工作块。

pool = Pool(processes=params.nthreads)
for chunk in chunk_list:
    pool.apply_async(__parallel_quant, [filelist, chunk, outfilename])

pool.close()
pool.join()

如您所见,这是标准的池执行,没有特殊用途。

最近我在运行大量数据时遇到了一个问题。标准执行需要16个线程约2小时,但我有一个特殊情况,需要大约8个小时,因为文件数量和大小都非常大。

问题是,最近我发现当我执行这种情况时,执行一直到结束都很好,除了其中一个子进程被卡住了。

<built-in method recv of _multiprocessing.Connection object at remote 0x3698db0>

由于子进程未完成,父进程不会被唤醒,执行就会停止。

这种情况只会在输入文件非常大时发生,因此我想知道是否有任何默认超时可能导致此问题。

我正在使用Python 2.7和multiprocessing 0.70a1,在CentOS 7上进行(32个核心,64GB RAM)。

感谢您的帮助。

Jordi


我认为问题在于您没有保留结果对象并调用result.get()从队列中获取结果消息。 - tdelaney
我没有任何结果。我将结果放在一个pickle文件中,然后父进程读取并收集它们。 - jvaquero
apply_async始终返回一个ApplyResult对象,无论您的目标函数是否返回值。我猜测结果队列填满了,所以子进程被阻塞了。如果您将结果添加到列表中,然后执行for result in results: result.get(),它会刷新队列。我不是100%确定,所以没有将其写成答案。 - tdelaney
我尝试了那个更改,但它没有解决问题。 - jvaquero
1个回答

2

来自多进程编程指南:

Avoid shared state

As far as possible one should try to avoid shifting large amounts of data between processes.
如果您需要将文件处理分成几个进程,最好是指示它们如何检索文件块而不是发送文件块本身。尝试将块偏移量和块大小传递给子进程。它可以使用open()和seek()从文件中检索块。您会注意到性能提高和内存占用减少。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接