Python多进程设计

18
我编写了一个算法,用于处理地理空间数据并执行一系列步骤。输入数据是多边形的形状文件和大型栅格研究区域(约1.5亿像素)的协变量光栅图。步骤如下:
  1. 从形状文件的多边形中采样点
  2. 对于每个采样点,提取协变量光栅图的值
  3. 在采样点上建立预测模型
  4. 提取目标网格点的协变量
  5. 将预测模型应用于目标网格
  6. 将预测结果写入一组输出网格
整个过程需要进行若干次迭代(比如100次),但每次迭代目前都需要超过1小时才能完成。对于每次迭代,最耗时的部分是步骤4和5。由于目标网格非常大,我一直在按块(比如每次1000行)处理它。
我拥有一个6核CPU和32 Gb RAM,在每次迭代中,我尝试使用Python的multiprocessing模块与Pool对象来同时处理一些块(步骤4和5),然后使用回调函数将输出(预测)写入公共的输出网格集合,该回调函数调用全局的输出写入函数。这似乎可以工作,但速度不比按顺序处理每个块更快(实际上可能更慢)。
因此,我的问题是,是否有更有效的方法来执行此操作?我对multiprocessing模块的Queue类感兴趣,但我不太确定它的工作原理。例如,我想知道是否有一个队列可以执行步骤4和5,然后将结果传递给另一个队列来执行步骤6。或者这甚至是Queue的用途吗?
如果您有任何指针,将不胜感激。

@Sarnold:它无法全部装入内存,这就是问题所在... - hendra
@npo:您可以在运行程序时运行 vmstat 1 工具,以查看您的 bobi 列是否接近磁盘容量限制或大部分时间低于磁盘带宽。 - sarnold
2
如果它无法全部放入内存,那么您可能会受到IO限制。sarnold的建议是运行vmstat并检查交换情况(好坏程度),这可能是个好主意。我喜欢Munin,并想在一次运行期间观察机器 - 它将绘制CPU正在执行的操作(关注iowait - 更多是不好的),内存如何被使用(关注缓存 - 更多是好的)以及交换分区正在承受多少IO(任何使用都是不好的)。根据您发现的情况,您可能希望追求其他选择,但我认为更多的内存和更快的磁盘将对您有很大帮助。 - rbanffy
另外(无法放在前面的消息中),请检查数据磁盘的IO使用量。 - rbanffy
2
我的建议是使用http://numpy.scipy.org/ - 这将有助于减少内存使用和CPU使用。 - matiu
显示剩余5条评论
4个回答

3

Python目前的多进程能力在CPU密集型处理方面并不出色。我很遗憾地告诉你,使用模块也不能让它运行更快,而且问题也不在于你使用了。

真正的问题在于Python仍然受全局解释器锁(GIL)的限制(我强烈建议查看幻灯片)。在绕过GIL方面已经有了一些令人兴奋的理论和实验性进展。Python 3.2甚至包含了一个新的GIL,它解决了某些问题,但也引入了其他问题。

目前来看,使用单个串行线程执行许多Python进程比尝试在一个进程中运行许多线程更快。这将使你避免在线程之间获取GIL的问题(通过有效地拥有更多的GIL)。然而,只有当你的Python进程之间的IPC开销不超过处理效益时,它才是有益的。

Eli Bendersky写了一篇不错的概述文章,介绍了他尝试使用多进程使CPU密集型进程运行更快的经验。

值得注意的是,PEP 371希望通过引入multiprocessing模块(以前是一个名为pyProcessing的非标准包)来“规避”GIL。然而,GIL在Python解释器中似乎仍然扮演着太大的角色,无法很好地处理CPU密集型算法。许多不同的人已经致力于消除/重写GIL,但没有任何一个取得足够的进展,能够成为Python发布的一部分。


我猜想这可能是情况。我一定会查看你的链接,谢谢。稍微换个方向说:我猜测Parallel Python模块也会受到同样类型的问题影响。但如果使用PP将任务分配给多台计算机呢? - hendra

1

由于Python并不适合进行大量的数字计算,因此我通常会将Python程序中时间关键部分转换为C/C++,从而大大提高速度。

此外,Python的多线程处理能力也不是很好。Python会为各种事情使用全局信号量。因此,即使您使用Python提供的线程,事情也不会变得更快。线程对于那些需要等待诸如IO之类的事物的应用程序非常有用。

在创建C模块时,您可以在处理数据时手动释放全局信号量(然后,当然,不再访问Python值)。

使用C API需要一些练习,但它的结构清晰,比如Java本机API易于使用。

请参阅Python文档中的“扩展和嵌入”。

这样,您就可以将C/C++用于时间关键部分,将更慢的部分与更快的编程工作结合起来使用Python...


1

我认为python.org上的一些多进程示例不是很清晰,很容易就会从错误的设计开始。这里是我制作的一个简单示例,帮助我开始一个项目:

import os, time, random, multiprocessing
def busyfunc(runseconds):
    starttime = int(time.clock())
    while 1:
        for randcount in range(0,100):
            testnum = random.randint(1, 10000000)
            newnum = testnum / 3.256
        newtime = int(time.clock())
        if newtime - starttime > runseconds:
            return

def main(arg):
    print 'arg from init:', arg
    print "I am " + multiprocessing.current_process().name

    busyfunc(15)

if __name__ == '__main__':

    p = multiprocessing.Process(name = "One", target=main, args=('passed_arg1',))
    p.start()

    p = multiprocessing.Process(name = "Two", target=main, args=('passed_arg2',))
    p.start()

    p = multiprocessing.Process(name = "Three", target=main, args=('passed_arg3',))
    p.start()

    time.sleep(5)

这应该在15秒内使用3个处理器。很容易修改为更多。也许这可以帮助调试您当前的代码,并确保您真正生成了多个独立进程。

如果由于RAM限制必须共享数据,则建议使用以下方法: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes


0

我建议您首先检查代码中哪些方面花费了最多的时间,因此您需要对其进行分析,我已经成功地使用http://packages.python.org/line_profiler/#line-profiler,但它需要cython。

至于队列,它们主要用于共享数据/同步线程,尽管我很少使用它。我一直在使用多进程。

我大多数时候都遵循map reduce的哲学,这是简单而干净的,但它有一些重大的开销,因为在应用映射函数时,值必须打包到字典中并复制到每个进程中...

您可以尝试将文件分段并将算法应用于不同的集合。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接