在Python中处理大量文件

3

我有大量报告文件(约650个),占用硬盘空间约320M,需要对它们进行处理。每个文件中都有很多条目;我需要根据它们的内容计数和记录它们。其中一些与彼此相关,我还需要找到、记录和计数它们;匹配可能在不同的文件中。我编写了一个简单的脚本来完成这项工作。我使用了Python分析器,仅在一个包含2000行的单个文件上运行脚本所需的时间为0.3秒,而我们需要处理其中一半的行。但是对于整个目录,需要1个半小时才能完成。以下是我的脚本:

# imports

class Parser(object):
    def __init__(self):
        # load some configurations
        # open some log files
        # set some initial values for some variables

    def parse_packet(self, tags):
        # extract some values from line

    def found_matched(self, packet):
        # search in the related list to find matched line

    def save_packet(self, packet):
        # write the line in the appropriate files and increase or decrease some counters

    def parse(self, file_addr):
        lines = [l for index, l in enumerate(open(file_addr, 'r').readlines()) if index % 2 != 0]
        for line in lines:
            packet = parse_packet(line)
            if found_matched(packet):
                # count
            self.save_packet(packet)

    def process_files(self):
        if not os.path.isdir(self.src_dir):
            self.log('No such file or directory: ' + str(self.src_dir))
            sys.exit(1)
        input_dirs = os.walk(self.src_dir)
        for dname in input_dirs:
            file_list = dname[2]
            for fname in file_list:
                self.parse(os.path.join(dname[0], fname))
        self.finalize_process()

    def finalize_process(self):
        # closing files

我希望将执行时间降低至当前时间的10%以下。也许使用multiprocessing可以帮助我,或者对当前脚本进行一些改进也能完成任务。不管怎样,你能帮我吗?

编辑1:

我已经根据@Reut Sharabani的答案修改了我的代码:

def parse(self, file_addr):
    lines = [l for index, l in enumerate(open(file_addr, 'r').readlines()) if index % 2 != 0]
    for line in lines:
        packet = parse_packet(line)
        if found_matched(packet):
            # count
        self.save_packet(packet)

def process_files(self):
    if not os.path.isdir(self.src_dir):
        self.log('No such file or directory: ' + str(self.src_dir))
        sys.exit(1)
    input_dirs = os.walk(self.src_dir)
    for dname in input_dirs:
        process_pool = multiprocessing.Pool(10)
        for fname in file_list:
            file_list = [os.path.join(dname[0], fname) for fname in dname[2]]
            process_pool.map(self.parse, file_list)
    self.finalize_process()

为了避免PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed错误,我在类定义之前添加了以下行:

import copy_reg
import types

def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)

copy_reg.pickle(types.MethodType, _pickle_method)

另外一个我在代码中做的事情是,在文件处理期间不保持打开的日志文件;我为每个条目打开和关闭它们以避免 "ValueError: I/O operation on closed file"。现在的问题是,我有一些被多次处理的文件,并且我得到了错误的数据包计数。我做错了什么?我应该在for循环之前放置 "process_pool = multiprocessing.Pool(10)" 吗?请注意,我现在只有一个目录,这似乎不是问题。编辑2:我还尝试过这样使用 "ThreadPoolExecutor":
with ThreadPoolExecutor(max_workers=10) as executor:
    for fname in file_list:
        executor.submit(self.parse, fname)

结果是正确的,但需要一个半小时才能完成。


1
看看celery - 它是一个任务队列。您可以为每个文件排队一个进程,并且任意数量的“工作”进程执行实际工作。但是,如果任务受到I/O限制,多任务处理只有在饱和I/O系统的那一点才会有所帮助。 - Paulo Scardine
1
@PauloScardine 你应该写成一个答案,也许。 - michel-slm
这篇关于 Tim Bray 的 Wide Finder 2 挑战的 Python 实现讨论也可能很有用--http://effbot.org/zone/wide-finder.htm - michel-slm
使用线程池。这可能比CPU阻塞更多地受到I/O阻塞的影响。 - Reut Sharabani
可能不是I/O限制,详见我的回答。 - Dr. Jan-Philip Gehrcke
@Jan-PhilipGehrcke修正了我的答案。感谢您的评论。 - Reut Sharabani
3个回答

3
首先,“大约650个文件,大约320 MB”并不多。考虑到现代硬盘轻松读写100 MB/s,您的系统的I/O性能可能不是瓶颈(也受到“仅需0.3秒即可运行具有2000行的单个文件脚本”的支持,这显然表明了CPU的限制)。但是,您在Python中读取文件的确切方式可能不够高效。
此外,基于简单的multiprocessing体系结构,在普通的多核系统上运行,将使您能够更快地执行分析(无需涉及celery,也无需跨机器边界)。

multiprocessing架构

只需查看multiprocessing,您的架构可能涉及一个管理进程(父进程),该进程定义任务Queue和工作进程Pool。管理器(或饲养员)将任务(例如文件名)放入队列中,而工作者则使用这些任务。完成任务后,工作者让管理器知道,并继续消耗下一个任务。

文件处理方法

这相当低效:
    lines = [l for index, l in enumerate(open(file_addr, 'r').readlines()) if index % 2 != 0]
    for line in lines:
        ...

readlines()函数在列表推导式被执行之前会将整个文件读取完毕。只有在此之后,你才会再次遍历所有行。因此,你需要遍历三次数据。将所有内容合并到一个循环中,这样你就只需要遍历一次所有行。


1
你应该在这里使用线程。如果后面被CPU阻塞了,你可以使用进程。
为了解释这个问题,我首先创建了一万个文件(0.txt ... 9999.txt),每个文件的行数都等于文件名(+1),使用以下命令:
for i in `seq 0 999`; do for j in `seq 0 $i`; do echo $i >> $i.txt; done ; done

接下来,我使用了一个 ThreadPool 和 10 个线程创建了一个 Python 脚本,以计算所有具有偶数值的文件的行数:

#!/usr/bin/env python

from multiprocessing.pool import ThreadPool
import time
import sys

print "creating %s threads" % sys.argv[1]
thread_pool = ThreadPool(int(sys.argv[1]))

files = ["%d.txt" % i for i in range(1000)]

def count_even_value_lines(filename):
    with open(filename, 'r') as f:
        # do some processing
        line_count = 0
        for line in f.readlines():
            if int(line.strip()) % 2 == 0:
                line_count += 1

        print "finished file %s" % filename
        return line_count

start = time.time()
print sum(thread_pool.map(count_even_value_lines, files))
total = time.time() - start
print total

正如您所看到的,这需要很少的时间,而且结果是正确的。 10个文件并行处理,CPU足够快以处理结果。如果您想要更多,可以考虑使用线程进程来利用所有CPU,同时不会让IO阻塞您。

编辑:

正如评论所指出的,我错了,这不是IO阻塞,因此您可以使用多进程加速(CPU阻塞)。因为我使用了ThreadPool,它与Pool具有相同的接口,因此您可以进行最小的编辑并运行相同的代码:
#!/usr/bin/env python

import multiprocessing
import time
import sys


files = ["%d.txt" % i for i in range(2000)]
# function has to be defined before pool is opened and workers are forked
def count_even_value_lines(filename):
    with open(filename, 'r') as f:
        # do some processing
        line_count = 0
        for line in f:
            if int(line.strip()) % 2 == 0:
                line_count += 1

        return line_count


print "creating %s processes" % sys.argv[1]
process_pool = multiprocessing.Pool(int(sys.argv[1]))

start = time.time()
print sum(process_pool.map(count_even_value_lines, files))
total = time.time() - start
print total

结果:

me@EliteBook-8470p:~/Desktop/tp$ python tp.py 1
creating 1 processes
25000000
21.2642059326
me@EliteBook-8470p:~/Desktop/tp$ python tp.py 10
creating 10 processes
25000000
12.4360249043

虽然multiprocessing.ThreadPool可能很好用,但我对在如此简单的问题上使用未记录的功能感到不安。SO不仅是提供解决方案,还包括教育。 - Dr. Jan-Philip Gehrcke
@Jan-PhilipGehrcke 谢谢,我实际测试了一下,你是对的。我编辑了答案并包含了修复。 - Reut Sharabani
如果我是正确的,您可能希望点赞我的答案,其中我解释了为什么这个问题不是I/O受限的:-)。 - Dr. Jan-Philip Gehrcke
关于你的第一条评论,我觉得ThreadPool可以像multiprocessing.Pool一样学习。这是一个需要工作线程的任务,而线程池就是为此而存在的,所以这是正确的方法。如果你不使用ThreadPool,你最终会实现类似的东西,这有点倒退。 - Reut Sharabani
我也觉得这很优雅。然而,从多进程中使用的ThreadPool并不是一个标准的方法,它没有被记录在文档中:https://docs.python.org/2/library/multiprocessing.html。这就是我想说的:将一个未经记录的stdlib部分用于解决这样一个简单的问题,在教学上看起来有些奇怪。 - Dr. Jan-Philip Gehrcke

1
除了使用并行处理外,你的parse方法相当低效,正如@Jan-PhilipGehrcke已经指出的那样。为了进一步说明他的建议:传统的变体:
def parse(self, file_addr):
    with open(file_addr, 'r') as f:
        line_no = 0
        for line in f:
            line_no += 1
            if line_no % 2 != 0:
                packet = parse_packet(line)
                if found_matched(packet):
                    # count
                self.save_packet(packet)

或者使用您自己的风格(假设您使用的是Python 3):

def parse(self, file_addr):
    with open(file_addr, 'r') as f:
        filtered = (l for index,l in enumerate(f) if index % 2 != 0)
        for line in filtered:
            # and so on

需要注意的是,这里使用了迭代器。构建筛选列表(实际上并不是一个列表!!)的所有操作都在迭代器上进行,并返回迭代器,这意味着在任何时候都没有将整个文件加载到列表中。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接