Python多进程池中的map函数默默死亡

4

我尝试过将for循环并行化以加速某些代码。考虑以下代码:

from multiprocessing import Pool

results = []

def do_stuff(str):
    print str
    results.append(str)

p = Pool(4)
p.map(do_stuff, ['str1','str2','str3',...]) # many strings here ~ 2000
p.close()

print results

我有一些调试信息,显示了程序在死亡之前运行的进度。但是似乎每次都会在不同的地方停止运行。例如,它会打印'str297',然后就停止运行了。我会看到所有的CPU停止工作,程序就这样卡住了。应该是发生了一些错误,但没有错误消息显示。有人知道如何调试这个问题吗?
更新
我尝试重新编写代码。不再使用map函数,而是像这样使用apply_async函数:
        pool = Pool(5)
        results = pool.map(do_sym, underlyings[0::10])
        results = []
        for sym in underlyings[0::10]:
            r = pool.apply_async(do_sym, [sym])
            results.append(r)

        pool.close()
        pool.join()

        for result in results:
            print result.get(timeout=1000)

这个方法和map函数一样好用,但却以同样的方式挂起。它永远无法到达for循环的打印结果处。

在更进一步地研究并尝试了一些调试日志(如unutbu所建议的)后,我将在这里提供更多信息。问题非常奇怪。看起来进程池停滞不前,无法关闭并继续程序。我使用PyDev环境测试我的程序,但我想尝试在控制台中运行Python。在控制台中,我得到了相同的行为,但当我按下Ctrl+C杀死程序时,我会得到一些输出,这可能可以解释问题所在:
> KeyboardInterrupt ^CProcess PoolWorker-47: Traceback (most recent call
> last):   File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-48: Traceback (most recent call
> last):   File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-45: Process PoolWorker-46:
> Process PoolWorker-44:
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> Traceback (most recent call last): Traceback (most recent call last):
> Traceback (most recent call last):   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap   File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     racquire()
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> KeyboardInterrupt
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     self.run()
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
>     self._target(*self._args, **self._kwargs)
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
>     racquire()   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker KeyboardInterrupt
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     task = get()
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get  
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     racquire()
>     return recv()
>     racquire() KeyboardInterrupt KeyboardInterrupt KeyboardInterrupt

实际上,该程序从未停止。我最终不得不关闭终端窗口才能杀死它。

更新2

我在运行池中的函数中找到了问题,原来是一个MySQL数据库事务引起的问题。我之前使用的是MySQLdb包。现在我将其切换为pandas.read_sql函数进行处理,现在它可以正常工作了。


2
“results”不会在进程之间共享。此外,每个进程都将重新导入模块并创建一个新的“Pool”,并将函数映射到其中。您需要将它们设置在单独的“__main__”块中。请参阅[文档](https://docs.python.org/2/library/multiprocessing.html#windows). - Peter Wood
结果并不需要立即共享,只要所有的结果最终都被追加即可。这应该可以工作,不是吗? - jeffery_the_wind
1个回答

4

pool.map 返回一个列表,因此不要在并发进程中调用 results.append(这样做不起作用,因为每个进程都有自己独立的 results 副本),而是将 results 赋值为由主进程返回的 pool.map 的值:

import multiprocessing as mp

def do_stuff(text):
    return text

if __name__ == '__main__':
    p = mp.Pool(4)
    tasks = ['str{}'.format(i) for i in range(2000)]
    results = p.map(do_stuff, tasks)
    p.close()

    print(results)

产量
['str0', 'str1', 'str2', 'str3', ...]

调试使用多进程的脚本的一种方法是添加日志记录语句。为此,multiprocessing模块提供了一个辅助函数mp.log_to_stderr。例如:

import multiprocessing as mp
import logging

logger = mp.log_to_stderr(logging.DEBUG)

def do_stuff(text):
    logger.info('Received {}'.format(text))
    return text

if __name__ == '__main__':
    p = mp.Pool(4)
    tasks = ['str{}'.format(i) for i in range(2000)]
    results = p.map(do_stuff, tasks)
    p.close()

    logger.info(results)

这将生成类似以下的日志输出:

[DEBUG/MainProcess] created semlock with handle 139824443588608
[DEBUG/MainProcess] created semlock with handle 139824443584512
[DEBUG/MainProcess] created semlock with handle 139824443580416
[DEBUG/MainProcess] created semlock with handle 139824443576320
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-1] Received str0
[INFO/PoolWorker-2] Received str125
[INFO/PoolWorker-3] Received str250
[INFO/PoolWorker-4] Received str375
[INFO/PoolWorker-3] Received str251
...
[INFO/PoolWorker-4] Received str1997
[INFO/PoolWorker-4] Received str1998
[INFO/PoolWorker-4] Received str1999
[DEBUG/MainProcess] closing pool
[INFO/MainProcess] ['str0', 'str1', 'str2', 'str3', ...]
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] task handler exiting
[DEBUG/PoolWorker-3] worker exiting after 2 tasks
[INFO/PoolWorker-3] process shutting down
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] terminating workers
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] cleaning up worker 4811
[DEBUG/MainProcess] running the remaining "atexit" finalizers

注意每行都表示哪个进程输出了日志记录。因此,输出在一定程度上序列化了并发进程中事件的顺序。
通过谨慎地放置logging.info调用,您应该能够缩小脚本“默默死亡”的位置(或者至少它在死亡时不会那么安静)。

谢谢,这是一个很好的见解,我已经改变了我的代码以按照这种方式操作,不幸的是,map函数仍然默默地死掉了,我不知道具体原因。 - jeffery_the_wind
哇,太棒了,非常感谢,你真厉害。不过我认为问题在于程序甚至没有运行到打印日志的地方。我想发生的情况是map函数认为某个进程仍在工作并等待它。 - jeffery_the_wind
很抱歉,昨天我没有完全理解你的答案。我刚刚按照你建议的方式实现了日志记录,效果很好。不幸的是,这种方法并没有提供任何额外的调试信息,而且程序仍然会卡住。最可怕的是,问题甚至还不一致。我找到了数据中的一个位置,大部分时间都会卡住,但有时候也可以通过它,但最终还是会卡住。请查看我的更新问题。 - jeffery_the_wind
1
请注意,如果计算受到 CPU 的限制,则使用超过机器处理器数量的进程是没有好处的。如果您使用 Pool(),则 multiprocessing 模块将确定可用的处理器数量并创建相应大小的进程池。因此,通常在初始化 Pool() 时不需要设置数量。 - unutbu
使用您建议的方法,我将问题缩小到了mysql数据库事务上。一旦我将其移除,池就不再挂起了。之前我使用的是MySQLdb包,现在我切换到了pandas.read_sql,目前看来效果不错。感谢您的帮助! - jeffery_the_wind
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接