使用多进程工作池

10
我编写了以下代码以使我的第二个懒惰的CPU核心工作。该代码的基本功能是首先在目录层次结构中查找所需的"sea"文件,然后执行一组外部脚本来处理这些二进制"sea"文件,以生成数量在50到100之间的文本和二进制文件。如问题标题所示,以并行方式增加处理速度。
这个问题起源于我们在IPython用户列表上进行的长时间讨论,标题为"Cannot start ipcluster"。从我的IPython并行处理功能实验开始。
问题在于我无法正确运行此代码。如果包含"sea"文件的文件夹只包含"sea"文件,则脚本在不完全执行外部脚本运行的情况下完成其执行。(假设我有30-50个要运行的外部脚本,但我的多处理启用的脚本仅在执行这些外部脚本链中的第一个脚本后耗尽。)有趣的是,如果我在已经处理过的文件夹上运行此脚本(即"sea"文件已经被处理并且输出文件已经在该文件夹中),那么它会运行,但这一次我得到的速度提升约为线性处理时间的2.4到2.7倍。这并不是很预期,因为我的笔记本电脑只有一个Core 2 Duo 2.5 GHz的CPU。虽然我有一个CUDA加速的GPU,但它与我的当前并行计算斗争无关 :)
您认为这个问题的根源是什么?
感谢所有的评论和建议。
#!/usr/bin/env python

from multiprocessing import Pool
from subprocess import call
import os


def find_sea_files():

   file_list, path_list = [], []
   init = os.getcwd()

   for root, dirs, files in os.walk('.'):
      dirs.sort()
      for file in files:
          if file.endswith('.sea'):
              file_list.append(file)
              os.chdir(root)
              path_list.append(os.getcwd())
              os.chdir(init)

   return file_list, path_list


def process_all(pf):
   os.chdir(pf[0])
   call(['postprocessing_saudi', pf[1]])


if __name__ == '__main__':
   pool = Pool(processes=2)              # start 2 worker processes
   files, paths = find_sea_files()
   pathfile = [[paths[i],files[i]] for i in range(len(files))]
   pool.map(process_all, pathfile)

当我尝试调用后处理沙特的一部分,即process_raw和execute子脚本时,而不是调用顶部外部脚本,我遇到了一个神秘的错误:以下只是错误的一部分。如所示,IDL执行混乱,无法获得正确的结果。[gsever@ccn partest]$ python proall3.py PID: 17722 PID: 17723 IDL版本7.1(linux x86 m32)。 (c)2009年,ITT视觉信息解决方案 IDL版本7.1(linux x86 m32)。 (c)2009年,ITT视觉信息解决方案 %无法获取文件状态。 单位:0,文件:<stdin> 坏文件描述符 - Gökhan Sever
2个回答

6

我建议先更好地了解工作进程的情况。如果需要,可以使用multiprocessing模块为其子进程提供日志记录。由于您已经简化了代码以缩小问题范围,因此我建议使用一些打印语句进行调试,例如(或者您可以使用PrettyPrint pf数组):


def process_all(pf):
   print "PID: ", os.getpid()
   print "Script Dir: ", pf[0]
   print "Script: ", pf[1]
   os.chdir(pf[0])
   call(['postprocessing_saudi', pf[1]])


if __name__ == '__main__':
   pool = Pool(processes=2)
   files, paths = find_sea_files()
   pathfile = [[paths[i],files[i]] for i in range(len(files))]
   pool.map(process_all, pathfile, 1) # Ensure the chunk size is 1
   pool.close()
   pool.join()

我用的Python版本是2.6.4,实现了这个功能。

脚本访问第一个目录,并在不启动外部处理进程的情况下转到第二个目录,尝试从其中处理第一个文件[gsever@ccn partest]$ python proall3.py PID: 10723 Script Dir: /home/gsever/Desktop/partest/20090317_131342/PostProcessing Script: 09_03_17_13_13_42.sea PID: 10724 Script Dir: /home/gsever/Desktop/partest/20090318_075533/PostProcessing Script: 09_03_18_07_55_33.sea 正在处理 09_03_18_07_55_33.sea 文件 ....................... 正在处理 09_03_17_13_13_42.sea 文件 ....................... 完成 - Gökhan Sever
执行失败,甚至没有正确地在每个海洋文件上应用外部脚本链。我仍然认为问题与Python的多进程模块有关。我很乐意听取更多意见,以找出确切的问题。 - Gökhan Sever

3

我可以想到以下几点:

1)你是否打印出路径文件?你确定它们都是正确生成的吗?

a)我这么问是因为你的os.walk有点奇怪;dirs.sort()应该没问题,但似乎很不必要。一般情况下不应使用os.chdir();还原应该是没问题的,但一般情况下你只需将root附加到init即可。

2)我曾经在python2.6上看到过多进程在生成子进程时出现问题。(我特别编写了一个脚本使用多进程来生成子进程。然后那些子进程无法正确使用多进程(池被锁定))。尝试使用带有mulitprocessing回退的python2.5。

3)尝试使用picloud的cloud.mp模块(它包装了多进程,但处理池的方式略有不同),看看是否有效。

你可以这样做:

cloud.mp.join(cloud.mp.map(process_all, pathfile))

(免责声明:我是PiCloud的开发人员之一)

  1. 是的,路径都是正确的。我使用IPython的并行处理功能测试了相同的路径,主脚本可以浏览文件夹并正确执行提到的脚本链以完全后处理海洋文件。
a)sort()是我Fedora文件系统(ext4)的解决方法。没有它,os.walk()会随意访问文件夹。
  1. 我会看一下2.5版本。现在我使用的是2.6.0版本。
  2. 感谢您的建议。我会尝试一下,但我倾向于不添加外部要求,以便其他人可以轻松地执行脚本。
- Gökhan Sever

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接