Python类用于合并排序文件,如何改进?

8

背景:

我正在清理大型(无法在内存中容纳)的制表符分隔文件。在清理输入文件时,我会在内存中建立一个列表;当它达到1,000,000个条目(大约1GB内存)时,我会对其进行排序(使用下面的默认键),并将列表写入文件。这个类是用来把排序后的文件重新组合起来的。到目前为止,它可以处理我遇到的文件。迄今为止,我最大的情况是合并66个已排序的文件。

问题:

  1. 我的逻辑有漏洞吗(哪里比较脆弱)?
  2. 我是否正确地实现了归并排序算法?
  3. 有没有明显的改进空间?

示例数据:

这是其中一个文件中一行的抽象表示:

'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n'

重要的是,我使用 'SomeStringId'.lower().replace(' ', '') 作为我的排序关键字。

原始代码:

class SortedFileMerger():
    """ A one-time use object that merges any number of smaller sorted 
        files into one large sorted file.

        ARGS:
            paths - list of paths to sorted files
            output_path - string path to desired output file
            dedup - (boolean) remove lines with duplicate keys, default = True
            key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')"
                  will be prepended by "lambda line: ".  This should be the same 
                  key that was used to sort the files being merged!
    """
    def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
        self.key = eval("lambda line: %s" % key)
        self.dedup = dedup
        self.handles = [open(path, 'r') for path in paths]
        # holds one line from each file
        self.lines = [file_handle.readline() for file_handle in self.handles]
        self.output_file = open(output_path, 'w')
        self.lines_written = 0
        self._mergeSortedFiles() #call the main method

    def __del__(self):
        """ Clean-up file handles.
        """
        for handle in self.handles:
            if not handle.closed:
                handle.close()
        if self.output_file and (not self.output_file.closed):
            self.output_file.close()

    def _mergeSortedFiles(self):
        """ Merge the small sorted files to 'self.output_file'. This can 
            and should only be called once.
            Called from __init__().
        """
        previous_comparable = ''
        min_line = self._getNextMin()
        while min_line:
            index = self.lines.index(min_line)
            comparable = self.key(min_line)
            if not self.dedup:                      
                #not removing duplicates
                self._writeLine(index)
            elif comparable != previous_comparable: 
                #removing duplicates and this isn't one
                self._writeLine(index)
            else:                                   
                #removing duplicates and this is one
                self._readNextLine(index)
            previous_comparable = comparable
            min_line = self._getNextMin()
        #finished merging
        self.output_file.close()

    def _getNextMin(self):
        """ Returns the next "smallest" line in sorted order.
            Returns None when there are no more values to get.
        """
        while '' in self.lines:
            index = self.lines.index('')
            if self._isLastLine(index):
                # file.readline() is returning '' because 
                # it has reached the end of a file.
                self._closeFile(index)
            else:
                # an empty line got mixed in
                self._readNextLine(index)
        if len(self.lines) == 0:
            return None
        return min(self.lines, key=self.key)

    def _writeLine(self, index):
        """ Write line to output file and update self.lines
        """
        self.output_file.write(self.lines[index])
        self.lines_written += 1
        self._readNextLine(index)

    def _readNextLine(self, index):
        """ Read the next line from handles[index] into lines[index]
        """
        self.lines[index] = self.handles[index].readline()

    def _closeFile(self, index):
        """ If there are no more lines to get in a file, it 
            needs to be closed and removed from 'self.handles'.
            It's entry in 'self.lines' also need to be removed.
        """
        handle = self.handles.pop(index)
        if not handle.closed:
            handle.close()
        # remove entry from self.lines to preserve order
        _ = self.lines.pop(index)

    def _isLastLine(self, index):
        """ Check that handles[index] is at the eof.
        """
        handle = self.handles[index]            
        if handle.tell() == os.path.getsize(handle.name):
            return True
        return False

编辑:根据Brian的建议,我得出了以下解决方案:

第二次编辑:根据John Machin的建议更新了代码:

def decorated_file(f, key):
    """ Yields an easily sortable tuple. 
    """
    for line in f:
        yield (key(line), line)

def standard_keyfunc(line):
    """ The standard key function in my application.
    """
    return line.split('\t', 2)[1].replace(' ', '').lower()

def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc):
    """ Does the same thing SortedFileMerger class does. 
    """
    files = map(open, paths) #open defaults to mode='r'
    output_file = open(output_path, 'w')
    lines_written = 0
    previous_comparable = ''
    for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]):
        comparable = line[0]
        if previous_comparable != comparable:
            output_file.write(line[1])
            lines_written += 1
        previous_comparable = comparable
    return lines_written

初步测试

使用相同的输入文件(2.2 GB的数据):

  • SortedFileMerger类花费了51分钟(3068.4秒)
  • Brian的解决方案花费了40分钟(2408.5秒)
  • 在添加John Machin的建议后,解决方案代码花费了36分钟(2214.0秒)

decorated_file 相当于 ((key(line),line) for line in f)。 - John La Rooy
@gnibbler,这会加速处理过程还是只是摆脱函数? - tgray
2个回答

17

请注意,在Python2.6中,heapq有一个新的merge函数可以为您执行此操作。

要处理自定义键函数,您只需使用某个装饰器对文件迭代器进行包装,使其根据键进行比较,并在之后将其剥离即可:

def decorated_file(f, key):
    for line in f: 
        yield (key(line), line)

filenames = ['file1.txt','file2.txt','file3.txt']
files = map(open, filenames)
outfile = open('merged.txt')

for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]):
    outfile.write(line[1])

[编辑] 即使在早期版本的Python中,从后来的heapq模块中获取merge的实现也是值得的。它是纯Python编写的,在Python2.5中可以无修改地运行,并且由于使用堆来获得下一个最小值,在合并大量文件时应该非常有效。

您应该能够简单地从Python2.6安装中复制heapq.py,将其复制到您的源中作为"heapq26.py",然后使用"from heapq26 import merge"-其中没有使用任何2.6特定的功能。或者,您可以只复制merge函数(重写heappop等调用以引用python2.5 heapq模块)。


实际上,我仍在使用Python 2.5。 - tgray
这是一个很棒的答案,我在谷歌上搜了几周也找不到。 - tgray

2

<< 这个“答案”是对原问题者的代码的评论 >>

建议:使用 eval() 是不好的,而且你所做的限制了调用者只能使用 lambda -- 关键字提取可能需要多行代码,而且在任何情况下,你不需要相同的函数来进行初步排序吗?

因此,请将以下内容替换为:

def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"):
    keyfunc = eval("lambda line: %s" % key)

使用这个:

def my_keyfunc(line):
    return line.split('\t', 2)[1].replace(' ', '').lower()
    # minor tweaks may speed it up a little

def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):    

谢谢,eval() 对我来说也感觉很奇怪,但我不知道有什么替代方法。我是从这个配方中得到的方法:http://code.activestate.com/recipes/576755/ - tgray
该配方仅提供eval()小把戏作为可选功能,供那些勇敢到足以在命令行中输入其键提取函数源代码的人使用,当他们运行多GB排序时。您会注意到这是干净地分离的;合并和排序功能都需要一个函数作为键参数,而不是一个字符串。 - John Machin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接