处理大文件的最快方法是什么?

40

我有多个3 GB的tab分隔文件,每个文件中有2000万行。所有行都必须独立处理,任何两行之间没有关系。我的问题是,哪种方法更快?

  1. 逐行读取?

with open() as infile:
    for line in infile:
  • 将文件分块读入内存并处理,例如每次处理250 MB的数据?

  • 处理过程并不复杂,我只是将第一列的值放入List1,第二列的值放入List2等等,可能需要将某些列的值加在一起。

    我正在使用在拥有30GB内存的linux系统上的Python 2.7。文本编码为ASCII。

    有没有任何方法可以加快并行处理速度?目前我正在使用前面提到的方法,但进程速度非常缓慢。使用任何CSVReader模块会有所帮助吗? 如果可以使用其他语言或数据库进行操作,则欢迎提出想法。


    4
    你的代码是I/O限制还是CPU限制?换句话说,处理时间比读取时间更长吗?如果是这样,你可能可以通过多进程来加快速度;如果不是,那么你的后台进程将花费大部分时间等待下一个读取操作,你将得不到任何好处。 - abarnert
    1
    @abarnert 最多也只能算是“尚可”。如果他/她有足够的内存并且不在意3GB的开销,可以使用for line in infile.readlines():,这样迭代速度比文件对象本身要快得多。 - the_constant
    2
    @Vincenzzzochi 实际上,我个人在使用Python处理“大数据”方面有很多经验,如果您正确设计解决方案,它的表现相当不错;再次取决于问题的性质,是CPU绑定还是I/O绑定或两者兼而有之。Python 并不真的很慢 :) - James Mills
    1
    @Reise45,语言选择并不是你的问题所在;而是你如何管理I/O以及如何委派工作(CPU绑定部分)。 - James Mills
    1
    @JamesMills:当然,在科学界,你通常是对事物进行对角处理而不是顺序处理的,所以说“我们有18GB的数据?那就来32GB的内存吧”可能是可以接受的,因为除了其他大数据用途外,你并不需要规划数十台服务器,只需要一台工作站... 但当然,Python也被广泛应用于许多大数据服务器类型的用途,所以你的观点绝对是正确的。 - abarnert
    显示剩余22条评论
    2个回答

    41

    看起来你的代码受到了I/O的限制。这意味着多进程不会起作用——如果你花费90%的时间从磁盘读取数据,那么让额外的7个进程等待下一次读取是没有任何帮助的。

    虽然使用CSV读取模块(无论是stdlib的csv还是类似NumPy或Pandas的模块)可能是为了简单起见,但它对性能的影响不大。

    尽管如此,仍然值得检查一下你是否真的受到I/O的限制,而不仅仅是猜测。运行你的程序,看看你的CPU使用率是接近0%还是接近100%或一个核心。像Amadan在评论中建议的那样,运行只有pass处理的程序,看看是否能减少5%或70%的时间。你甚至可以尝试使用os.openos.read(1024*1024)循环进行比较,看看是否更快。


    由于你使用的是Python 2.x,Python依靠C stdio库来猜测每次缓冲的大小,所以强制使其缓冲更多可能是值得的。最简单的方法是使用readlines(bufsize)并设置一个大的bufsize。(你可以尝试不同的数字并测量它们,看看哪个峰值最高。根据我的经验,通常64K-8MB之间的任何数字都差不多,但是根据你的系统情况可能会有所不同——特别是如果你从具有很高吞吐量但延迟非常糟糕的网络文件系统中读取数据,这些延迟可能会超过物理驱动器和操作系统缓存的吞吐量与延迟之间的平衡点。)

    例如:

    bufsize = 65536
    with open(path) as infile: 
        while True:
            lines = infile.readlines(bufsize)
            if not lines:
                break
            for line in lines:
                process(line)
    

    同时,假设您使用的是64位系统,您可以尝试使用mmap来代替首先读取文件。这并不一定保证会更好,但它可能会更好,这取决于您的系统。例如:

    with open(path) as infile:
        m = mmap.mmap(infile, 0, access=mmap.ACCESS_READ)
    

    Python中的mmap是一种奇怪的对象——它既像str,又像file,因此你可以手动迭代扫描换行符,也可以像操作文件那样调用readline。与按行遍历文件或批量读取readlines相比,这两种方法都会给Python带来更多的处理负担(因为在C中循环现在变为了纯Python……尽管也许你可以通过使用re或简单的Cython扩展来解决这个问题?)……但是,由于操作系统知道你正在执行映射操作,因此I/O优势可能会超过CPU劣势。

    不幸的是,Python没有公开madvise调用,你无法像在C中那样进行优化(例如,明确设置MADV_SEQUENTIAL而不是让内核猜测,或者强制使用透明大页面),但实际上可以从libcctypes该函数。


    我在 Linux 服务器上有 30 GB 的内存。使用 readlines() 将整个文件读入内存会有什么问题吗? - Reise45
    @Reise45:这取决于你所说的“问题”是什么意思。它应该可以正常工作;在一个3GB的文件上使用readlines应该不到4GB,在内存中将所有行预处理为值列表,这不应该超过12GB,因此您仍然在舒适的限制范围内。但这意味着您必须首先完成所有阅读工作,以便操作系统无法帮助流水线化您的I/O等待和CPU工作;您会浪费时间在malloc和缓存故障上等。如果有一些好处(例如,它让您使用NumPy加速缓慢的处理循环),那可能是值得的,但如果没有,为什么要这样做呢? - abarnert
    @Reise45:如果你有很多这些文件,每个文件的处理时间都需要25分钟,那么尝试用另一种方式处理其中一个文件,看看是否能在15分钟内完成或者需要在一个小时后取消它,这将比你猜测得到更多信息。 - abarnert
    我正在使用缓冲区大小进行读取,但是当内存使用率达到100%时,脚本仍然会被终止。我该如何防止这种情况发生?我需要修复读取数据的数据结构吗? - Reise45
    假设我的内存为16G,文件大小为18G,我知道可以创建块并处理它,但是否有更好的方法来实现相同的目的? - Puneet Tripathi
    显示剩余2条评论

    8

    我知道这个问题已经很老了;但是我想做类似的事情,我创建了一个简单的框架来帮助您并行读取和处理大型文件。在此留下我的尝试作为答案。

    以下是代码,最后给出一个示例:

    def chunkify_file(fname, size=1024*1024*1000, skiplines=-1):
        """
        function to divide a large text file into chunks each having size ~= size so that the chunks are line aligned
    
        Params : 
            fname : path to the file to be chunked
            size : size of each chink is ~> this
            skiplines : number of lines in the begining to skip, -1 means don't skip any lines
        Returns : 
            start and end position of chunks in Bytes
        """
        chunks = []
        fileEnd = os.path.getsize(fname)
        with open(fname, "rb") as f:
            if(skiplines > 0):
                for i in range(skiplines):
                    f.readline()
    
            chunkEnd = f.tell()
            count = 0
            while True:
                chunkStart = chunkEnd
                f.seek(f.tell() + size, os.SEEK_SET)
                f.readline()  # make this chunk line aligned
                chunkEnd = f.tell()
                chunks.append((chunkStart, chunkEnd - chunkStart, fname))
                count+=1
    
                if chunkEnd > fileEnd:
                    break
        return chunks
    
    def parallel_apply_line_by_line_chunk(chunk_data):
        """
        function to apply a function to each line in a chunk
    
        Params :
            chunk_data : the data for this chunk 
        Returns :
            list of the non-None results for this chunk
        """
        chunk_start, chunk_size, file_path, func_apply = chunk_data[:4]
        func_args = chunk_data[4:]
    
        t1 = time.time()
        chunk_res = []
        with open(file_path, "rb") as f:
            f.seek(chunk_start)
            cont = f.read(chunk_size).decode(encoding='utf-8')
            lines = cont.splitlines()
    
            for i,line in enumerate(lines):
                ret = func_apply(line, *func_args)
                if(ret != None):
                    chunk_res.append(ret)
        return chunk_res
    
    def parallel_apply_line_by_line(input_file_path, chunk_size_factor, num_procs, skiplines, func_apply, func_args, fout=None):
        """
        function to apply a supplied function line by line in parallel
    
        Params :
            input_file_path : path to input file
            chunk_size_factor : size of 1 chunk in MB
            num_procs : number of parallel processes to spawn, max used is num of available cores - 1
            skiplines : number of top lines to skip while processing
            func_apply : a function which expects a line and outputs None for lines we don't want processed
            func_args : arguments to function func_apply
            fout : do we want to output the processed lines to a file
        Returns :
            list of the non-None results obtained be processing each line
        """
        num_parallel = min(num_procs, psutil.cpu_count()) - 1
    
        jobs = chunkify_file(input_file_path, 1024 * 1024 * chunk_size_factor, skiplines)
    
        jobs = [list(x) + [func_apply] + func_args for x in jobs]
    
        print("Starting the parallel pool for {} jobs ".format(len(jobs)))
    
        lines_counter = 0
    
        pool = mp.Pool(num_parallel, maxtasksperchild=1000)  # maxtaskperchild - if not supplied some weird happend and memory blows as the processes keep on lingering
    
        outputs = []
        for i in range(0, len(jobs), num_parallel):
            print("Chunk start = ", i)
            t1 = time.time()
            chunk_outputs = pool.map(parallel_apply_line_by_line_chunk, jobs[i : i + num_parallel])
    
            for i, subl in enumerate(chunk_outputs):
                for x in subl:
                    if(fout != None):
                        print(x, file=fout)
                    else:
                        outputs.append(x)
                    lines_counter += 1
            del(chunk_outputs)
            gc.collect()
            print("All Done in time ", time.time() - t1)
    
        print("Total lines we have = {}".format(lines_counter))
    
        pool.close()
        pool.terminate()
        return outputs
    

    例如,我有一个文件,想要计算每行中单词的数量,那么每行的处理过程如下:

    def count_words_line(line):
        return len(line.strip().split())
    

    然后像这样调用函数:

    parallel_apply_line_by_line(input_file_path, 100, 8, 0, count_words_line, [], fout=None)
    

    使用这种方法,在一个大小约为20GB的样本文件中,与普通逐行阅读相比,我可以获得约8倍的速度提升,并对每行进行一些适度复杂的处理。


    这种方法是否存在这样一种情况:一行被分成了100字节的块,而另一行被计算为不同的行?当你将文件分成字节块时,你永远不知道当前行会在哪里被分割以满足空间要求。 - edo101
    1
    有一个“readline()”函数可以将文件指针定位到行末,以便获取对齐的行块。 - Deepak Saini
    如果您以二进制方式读取文件,块大小是否重要?如果使用'rb',那么不是会抵消\n吗?如果是这样,您仍然需要担心文件块被切断的问题吗? - edo101

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接