Python多进程池map_async功能卡住了

4

我有一个包含80,000个字符串的列表,我正在通过对话解析器运行它们,为了加快这个过程的速度,我一直在尝试使用Python多进程包。

解析器代码需要Python 2.7,并且我目前正在使用Ubuntu机器上的2核心,使用字符串的子集。对于短列表,即20个,该进程在两个核心上都可以正常运行,但是如果我运行大约100个字符串的列表,则两个工作程序将在不同的点上冻结(因此在某些情况下,工作程序1直到几分钟后才停止)。在所有字符串完成和任何内容返回之前发生这种情况。每次使用相同的映射函数时,内核都会在相同的点停止,但是如果尝试使用不同的映射函数,例如map vs map_async vs imap,则这些点会有所不同。

我已经尝试删除这些索引处的字符串,但没有任何影响,并且这些字符串在较短的列表中运行良好。根据我包括的打印语句,在进程似乎冻结时,当前迭代似乎已经完成了当前字符串,只是没有继续进行下一个字符串。需要大约一个小时的运行时间才能到达两个工作者都冻结的地方,而我无法在较短的时间内重现此问题。涉及多进程命令的代码如下:

def main(initial_file, chunksize = 2):
    entered_file = pd.read_csv(initial_file)
    entered_file = entered_file.ix[:, 0].tolist()

    pool = multiprocessing.Pool()

    result = pool.map_async(discourse_process, entered_file, chunksize = chunksize)

    pool.close()
    pool.join()

    with open("final_results.csv", 'w') as file:
        writer = csv.writer(file)
        for listitem in result.get():
            writer.writerow([listitem[0], listitem[1]])

if __name__ == '__main__':
    main(sys.argv[1])

当我用Ctrl-C停止进程(这并不总是有效)时,我收到的错误信息是:
^CTraceback (most recent call last):
  File "Combined_Script.py", line 94, in <module>
    main(sys.argv[1])
  File "Combined_Script.py", line 85, in main
    pool.join()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 474, in join
    p.join()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 145, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python2.7/multiprocessing/forking.py", line 154, in wait
    return self.poll(0)
  File "/usr/lib/python2.7/multiprocessing/forking.py", line 135, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
Process PoolWorker-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 117, in worker
    put((job, i, result))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt
^CProcess PoolWorker-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 117, in worker
    put((job, i, result))
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 392, in put
    return send(obj)
KeyboardInterrupt
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 305, in _exit_function
    _run_finalizers(0)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
    finalizer()
  File "/usr/lib/python2.7/multiprocessing/util.py", line 207, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 500, in _terminate_pool
    outqueue.put(None)                  # sentinel
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 305, in _exit_function
    _run_finalizers(0)
  File "/usr/lib/python2.7/multiprocessing/util.py", line 274, in _run_finalizers
    finalizer()
  File "/usr/lib/python2.7/multiprocessing/util.py", line 207, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 500, in _terminate_pool
    outqueue.put(None)                  # sentinel
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 390, in put
    wacquire()
KeyboardInterrupt

当工作器冻结时,使用htop在另一个命令窗口查看内存占用率不超过3%。这是我第一次尝试并行处理,我不确定还有哪些问题需要解决?

2个回答

1

我无法解决使用多进程池的问题,但我发现了loky包,并使用以下代码运行了我的程序:

executor = loky.get_reusable_executor(timeout = 200, kill_workers = True)
results = executor.map(discourse_process, entered_file)

1
您可以为进程定义一个时间,以便返回结果,否则它将引发错误:
try:
    result.get(timeout = 1)
except multiprocessing.TimeoutError:
    print("Error while retrieving the result")

你还可以通过以下方式验证您的进程是否成功:

import time
while True:
    try:
        result.succesful()
    except Exception:
        print("Result is not yet succesful")
    time.sleep(1)

最后,查看https://docs.python.org/2/library/multiprocessing.html对于编程相关内容是有帮助的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接