使用多进程读取多个文件

5
我需要读取一些非常大的文本文件(100+ Mb),使用正则表达式处理每一行,并将数据存储到一个结构中。我的结构继承自defaultdict,它有一个read(self)方法来读取self.file_name文件。
看看这个非常简单(但不是真实的)例子,我没有使用正则表达式,而是分割了行:

import multiprocessing
from collections import defaultdict

def SingleContainer():
    return list()

class Container(defaultdict):
    """
    this class store odd line in self["odd"] and even line in self["even"].
    It is stupid, but it's only an example. In the real case the class
    has additional methods that do computation on readen data.
    """
    def __init__(self,file_name):
        if type(file_name) != str:
            raise AttributeError, "%s is not a string" % file_name
        defaultdict.__init__(self,SingleContainer)
        self.file_name = file_name
        self.readen_lines = 0
    def read(self):
        f = open(self.file_name)
        print "start reading file %s" % self.file_name
        for line in f:
            self.readen_lines += 1
            values = line.split()
            key = {0: "even", 1: "odd"}[self.readen_lines %2]
            self[key].append(values)
        print "readen %d lines from file %s" % (self.readen_lines, self.file_name)

def do(file_name):
    container = Container(file_name)
    container.read()
    return container.items()

if __name__ == "__main__":
    file_names = ["r1_200909.log", "r1_200910.log"]
    pool = multiprocessing.Pool(len(file_names))
    result = pool.map(do,file_names)
    pool.close()
    pool.join()
    print "Finish"      

最后,我需要将每个结果合并到一个容器中。保留行的顺序非常重要。我的方法在返回值时速度太慢。有更好的解决方案吗?我使用的是Linux上的Python 2.6。

3个回答

5
您可能遇到了两个问题。
其中一个已经被提到:您同时读取多个文件。这些读取将交错进行,导致磁盘抖动。您需要一次性读取整个文件,然后仅在数据上进行多线程计算。
第二个问题是您遇到了Python的多进程模块的开销。它实际上没有使用线程,而是通过管道启动多个进程并序列化结果。对于大量数据,这非常慢--实际上似乎比您在线程中执行的工作还要慢(至少在示例中)。这就是GIL引起的现实问题。
如果我将do()修改为返回None而不是container.items()以禁用额外数据的复制,则此示例比单个线程快,只要文件已缓存:
两个线程:0.36elapsed 168%CPU
一个线程(用map替换pool.map):0:00.52elapsed 98%CPU
不幸的是,GIL问题是根本性的,不能从Python内部解决。

是的,这就是问题:返回数据。我正在使用多进程而不是多线程,因为有GIL。但是我想使用CPU的所有核心来优化我的程序!如果我从“开始读取文件”和“读取%d行”(忽略返回时间)的时间进行测量,多进程版本比单进程版本快2倍(我有2个核心)。现在:共享内存怎么样?我看了multiprocess.Manager类,但我想共享比字典更复杂的结构。 - Ruggero Turra
我没有使用过Manager,但它看起来像是代理数据操作,所以我怀疑它甚至更慢。你可以使用共享内存来共享简单的内存块,但不能共享本地Python类型。你可能想寻找其他优化方法,但如果没有实际代码,我无法提出任何建议。 - Glenn Maynard
也许一个解决方案是:重写C++中的读取函数,并使用真正的多线程处理?采用这种方法,我可以避免在进程之间共享数据(管道)时出现的问题。 - Ruggero Turra
在构建Python数据结构时,您始终需要持有GIL,并且在C中执行解析和构造结果比在Python中要更费力(由您自己完成)。我无法说这是否是一个好主意,但听起来很混乱。 - Glenn Maynard

0

多进程适用于CPU或内存密集型进程,因为旋转驱动器的寻道时间在文件切换时会降低性能。要么将日志文件加载到快速闪存驱动器或某种内存磁盘(物理或虚拟)中,要么放弃多进程。


我的问题是CPU受限而不是IO受限。在这个例子中,我正在拆分行,但在实际情况下,我正在使用复杂且长的正则表达式,并且IO时间(寻找,...)比CPU时间短得多。 - Ruggero Turra

0

您正在使用与文件数目相同数量的工人创建池。这可能太多了。通常,我会让工人数量与核心数大致相同。

简单的事实是,您的最终步骤将是将所有结果合并为一个进程。考虑到您的问题描述,无法避免这种情况。这被称为屏障同步:所有任务必须在任何任务继续之前到达相同点。

您应该多次运行此程序或在循环中传递不同的值给multiprocessing.Pool(),从1开始到核心数。计时每个运行并查看哪个工作人员计算效果最好。

结果将取决于任务有多少是CPU密集型(与磁盘密集型相对)。如果您的任务大约半数是CPU和半数是磁盘,则即使在8核机器上,2个核心也可能最佳。


是的,我已经做过了。我的选择不是随机的选择。我尝试在没有返回行的情况下计算时间,并且最好的选择是当进程数等于文件数,即使进程数大于核心数。 - Ruggero Turra
那我不知道你如何能做得更好了。致命问题在于:“保留行的顺序很重要。”即使您已经对每个文件进行了预处理,但这只能一次输入一个文件处理。您的另一个选择是让每个工人生成带有后缀的文件,并且让读取这些文件的任何程序按顺序读取它们,以此消除合并。 - Mike DeSimone

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接