我尝试过将for循环并行化以加速某些代码。考虑以下代码:
from multiprocessing import Pool
results = []
def do_stuff(str):
print str
results.append(str)
p = Pool(4)
p.map(do_stuff, ['str1','str2','str3',...]) # many strings here ~ 2000
p.close()
print results
我有一些调试信息,显示了程序在死亡之前运行的进度。但是似乎每次都会在不同的地方停止运行。例如,它会打印'str297',然后就停止运行了。我会看到所有的CPU停止工作,程序就这样卡住了。应该是发生了一些错误,但没有错误消息显示。有人知道如何调试这个问题吗?
更新
我尝试重新编写代码。不再使用map函数,而是像这样使用apply_async函数:
pool = Pool(5)
results = pool.map(do_sym, underlyings[0::10])
results = []
for sym in underlyings[0::10]:
r = pool.apply_async(do_sym, [sym])
results.append(r)
pool.close()
pool.join()
for result in results:
print result.get(timeout=1000)
这个方法和
map
函数一样好用,但却以同样的方式挂起。它永远无法到达for循环的打印结果处。在更进一步地研究并尝试了一些调试日志(如unutbu所建议的)后,我将在这里提供更多信息。问题非常奇怪。看起来进程池停滞不前,无法关闭并继续程序。我使用PyDev环境测试我的程序,但我想尝试在控制台中运行Python。在控制台中,我得到了相同的行为,但当我按下Ctrl+C杀死程序时,我会得到一些输出,这可能可以解释问题所在:
> KeyboardInterrupt ^CProcess PoolWorker-47: Traceback (most recent call
> last): File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-48: Traceback (most recent call
> last): File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-45: Process PoolWorker-46:
> Process PoolWorker-44:
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> Traceback (most recent call last): Traceback (most recent call last):
> Traceback (most recent call last): File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> racquire()
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> KeyboardInterrupt
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> self.run()
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run File
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> self._target(*self._args, **self._kwargs)
> self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> racquire() File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker KeyboardInterrupt
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> task = get()
> task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
> racquire()
> return recv()
> racquire() KeyboardInterrupt KeyboardInterrupt KeyboardInterrupt
实际上,该程序从未停止。我最终不得不关闭终端窗口才能杀死它。
更新2
我在运行池中的函数中找到了问题,原来是一个MySQL数据库事务引起的问题。我之前使用的是MySQLdb
包。现在我将其切换为pandas.read_sql
函数进行处理,现在它可以正常工作了。