我有大量报告文件(约650个),占用硬盘空间约320M,需要对它们进行处理。每个文件中都有很多条目;我需要根据它们的内容计数和记录它们。其中一些与彼此相关,我还需要找到、记录和计数它们;匹配可能在不同的文件中。我编写了一个简单的脚本来完成这项工作。我使用了Python分析器,仅在一个包含2000行的单个文件上运行脚本所需的时间为0.3秒,而我们需要处理其中一半的行。但是对于整个目录,需要1个半小时才能完成。以下是我的脚本:
# imports
class Parser(object):
def __init__(self):
# load some configurations
# open some log files
# set some initial values for some variables
def parse_packet(self, tags):
# extract some values from line
def found_matched(self, packet):
# search in the related list to find matched line
def save_packet(self, packet):
# write the line in the appropriate files and increase or decrease some counters
def parse(self, file_addr):
lines = [l for index, l in enumerate(open(file_addr, 'r').readlines()) if index % 2 != 0]
for line in lines:
packet = parse_packet(line)
if found_matched(packet):
# count
self.save_packet(packet)
def process_files(self):
if not os.path.isdir(self.src_dir):
self.log('No such file or directory: ' + str(self.src_dir))
sys.exit(1)
input_dirs = os.walk(self.src_dir)
for dname in input_dirs:
file_list = dname[2]
for fname in file_list:
self.parse(os.path.join(dname[0], fname))
self.finalize_process()
def finalize_process(self):
# closing files
我希望将执行时间降低至当前时间的10%以下。也许使用multiprocessing
可以帮助我,或者对当前脚本进行一些改进也能完成任务。不管怎样,你能帮我吗?
编辑1:
我已经根据@Reut Sharabani的答案修改了我的代码:
def parse(self, file_addr):
lines = [l for index, l in enumerate(open(file_addr, 'r').readlines()) if index % 2 != 0]
for line in lines:
packet = parse_packet(line)
if found_matched(packet):
# count
self.save_packet(packet)
def process_files(self):
if not os.path.isdir(self.src_dir):
self.log('No such file or directory: ' + str(self.src_dir))
sys.exit(1)
input_dirs = os.walk(self.src_dir)
for dname in input_dirs:
process_pool = multiprocessing.Pool(10)
for fname in file_list:
file_list = [os.path.join(dname[0], fname) for fname in dname[2]]
process_pool.map(self.parse, file_list)
self.finalize_process()
为了避免PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
错误,我在类定义之前添加了以下行:
import copy_reg
import types
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)
另外一个我在代码中做的事情是,在文件处理期间不保持打开的日志文件;我为每个条目打开和关闭它们以避免 "ValueError: I/O operation on closed file"。现在的问题是,我有一些被多次处理的文件,并且我得到了错误的数据包计数。我做错了什么?我应该在for循环之前放置 "process_pool = multiprocessing.Pool(10)" 吗?请注意,我现在只有一个目录,这似乎不是问题。编辑2:我还尝试过这样使用 "ThreadPoolExecutor":
with ThreadPoolExecutor(max_workers=10) as executor:
for fname in file_list:
executor.submit(self.parse, fname)
结果是正确的,但需要一个半小时才能完成。