在Python中将多个文件流合并为一个可读对象

7
我有一个函数,使用file.read(len)方法处理来自文件的二进制数据。然而,我的文件非常大,被分成许多每个50 MBytes的小文件。有没有一些包装类可以将许多文件馈入缓冲流,并提供一个read()方法? fileinput.FileInput类可以做到这一点,但它仅支持逐行读取(使用无参数的readline()方法),并且没有指定要读取的字节数的read(len)方法。

你正在寻找标准库内的某些东西吗?写这个应该不太难。 - loopbackbee
4个回答

6

不必像其他答案那样将流列表转换为生成器,而是可以将这些流链接在一起,然后使用文件接口:

def chain_streams(streams, buffer_size=io.DEFAULT_BUFFER_SIZE):
    """
    Chain an iterable of streams together into a single buffered stream.
    Usage:
        def generate_open_file_streams():
            for file in filenames:
                yield open(file, 'rb')
        f = chain_streams(generate_open_file_streams())
        f.read()
    """

    class ChainStream(io.RawIOBase):
        def __init__(self):
            self.leftover = b''
            self.stream_iter = iter(streams)
            try:
                self.stream = next(self.stream_iter)
            except StopIteration:
                self.stream = None

        def readable(self):
            return True

        def _read_next_chunk(self, max_length):
            # Return 0 or more bytes from the current stream, first returning all
            # leftover bytes. If the stream is closed returns b''
            if self.leftover:
                return self.leftover
            elif self.stream is not None:
                return self.stream.read(max_length)
            else:
                return b''

        def readinto(self, b):
            buffer_length = len(b)
            chunk = self._read_next_chunk(buffer_length)
            while len(chunk) == 0:
                # move to next stream
                if self.stream is not None:
                    self.stream.close()
                try:
                    self.stream = next(self.stream_iter)
                    chunk = self._read_next_chunk(buffer_length)
                except StopIteration:
                    # No more streams to chain together
                    self.stream = None
                    return 0  # indicate EOF
            output, self.leftover = chunk[:buffer_length], chunk[buffer_length:]
            b[:len(output)] = output
            return len(output)

    return io.BufferedReader(ChainStream(), buffer_size=buffer_size)

然后像使用其他文件/流一样使用它:

f = chain_streams(open_files_or_chunks)
f.read(len)

看起来,根据这个写法,self.leftover 永远不会被使用,因为 len(chunk) <= buffer_length 总是成立的。 - Michael Hadam
再次查看代码,我同意我的代码过于防御性,可以简化。 - Hardbyte

5

使用itertools.chain很容易拼接可迭代对象:

from itertools import chain

def read_by_chunks(file_objects, block_size=1024):
    readers = (iter(lambda f=f: f.read(block_size), '') for f in file_objects)
    return chain.from_iterable(readers)

您可以执行以下操作:

for chunk in read_by_chunks([f1, f2, f3, f4], 4096):
    handle(chunk)

按顺序处理文件,同时按4096字节块读取。

如果您需要提供一个具有 read 方法的对象,因为某些其他函数期望这样做,您可以编写一个非常简单的包装器:

class ConcatFiles(object):
    def __init__(self, files, block_size):
        self._reader = read_by_chunks(files, block_size)

    def __iter__(self):
        return self._reader

    def read(self):
        return next(self._reader, '')

然而,这种方法只使用固定的块大小。可以通过以下方式支持read函数的block_size参数:

def read(self, block_size=None):
    block_size = block_size or self._block_size
    total_read = 0
    chunks = []

    for chunk in self._reader:
        chunks.append(chunk)
        total_read += len(chunk)
        if total_read > block_size:
            contents = ''.join(chunks)
            self._reader = chain([contents[block_size:]], self._reader)
            return contents[:block_size]
    return ''.join(chunks)

注意:如果您正在以二进制模式阅读,请用空字节b''替换代码中的空字符串''

@mahkitah 你为什么这么认为?迭代会产生 block_size 大小的块。但请注意,当 _reader 被消耗完时,循环会被跳过,最终得到的是 ''.join(chunks)。此外,即使最后一部分不是完全大小,read_by_chunks 也会返回所有内容。 - undefined
@mahkitah 例如 f1 = StringIO("a"*13); f2 = StringIO("b"*7); f3 = StringIO("c"*4); c = ConcatFiles([f1,f2,f3], block_size=5) 如果你多次执行 c.read() 你会得到 aaaaa, aaaaa, aaabb, bbbbb, cccc 然后是空字符串,这是正确的。在这里你可以看到所有的文件大小都不是块大小的倍数,甚至总大小也不是块大小的倍数 - undefined
只有在使用包装器和修改了读取方法时,它才能像这样工作。关于可以使用read_by_chunks生成器“按顺序处理文件,同时按block_size字节的块进行读取”的说法是不正确的。当生成器到达每个文件的末尾时,它将产生剩余的任何大小。 - undefined
@mahkitah 是的,事实上它并不如此。你看到我带有3个示例文件的评论了吗?你可以看到ConcatFiles只会生成最后一个小于5的块。 - undefined
我不是在谈论ConcatFiles。我在谈论的是read_by_chunks以及它的描述如何给人虚假的期望。它说read_by_chunks会按照4096字节的块读取文件,但它也会读取更小的块,而不仅仅是最后一个块。 - undefined
显示剩余4条评论

3

我不熟悉标准库中任何执行该功能的内容,因此,如果没有:

try:
    from cStringIO import StringIO
except ImportError:
    from StringIO import StringIO

class ConcatenatedFiles( object ):
    def __init__(self, file_objects):
        self.fds= list(reversed(file_objects))

    def read( self, size=None ):
        remaining= size
        data= StringIO()
        while self.fds and (remaining>0 or remaining is None):
            data_read= self.fds[-1].read(remaining or -1)
            if len(data_read)<remaining or remaining is None: #exhausted file
                self.fds.pop()
            if not remaining is None:
                remaining-=len(data_read)
            data.write(data_read)
        return data.getvalue()

可以。为什么你要使用StringIO而不是普通的字符串来表示“data”? - xivaxy
1
@xivaxy 因为根据文件对象的数量和文件大小,字符串拼接可能会效率低下。如果您计划在少量小文件中使用此功能,则可以改用字符串。有关详细信息,请参见此文章 - loopbackbee
@goncalopp 注意,该文章已经十年过去了。在当前版本的Python中,事情肯定已经发生了变化(尽管我相信使用StringIOjoin仍然是最快的方法)。 - Bakuriu

0
另一种方法是使用生成器:
def read_iter(streams, block_size=1024):
    for stream in streams:
        for chunk in stream.read(block_size):
            yield chunk

# open file handles
file1 = open('f1.txt', 'r')
file2 = open('f2.txt', 'r')
fileOut = open('out.txt', 'w')

# concatenate files 1 & 2
for chunk in read_iter([file1, file2]):
    # process chunk (in this case, just concatenate to output)
    fileOut.write(chunk)

# close files
file1.close()
file2.close()
fileOut.close()

这个过程不应该占用除基础脚本和块大小所需的内存之外的任何内存。它会将每个块直接从一个文件阅读器传递到另一个写入器,然后重复此过程,直到所有流都完成。

如果您需要在类中使用此行为,那么可以像Bakuriu描述的那样轻松地构建成一个容器类。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接