Python中读取大文件的懒惰方法?

378

我有一个非常大的4GB文件,当我尝试读取它时,我的计算机会卡住。因此,我想逐块读取它,并在处理完每个块后将已处理的块存储到另一个文件中,然后再读取下一块。

是否有一种方法可以使用 yield 来分块读取?

我希望有一种 懒加载的方法

12个回答

552
要编写一个懒惰函数,只需使用yield
def read_in_chunks(file_object, chunk_size=1024):
    """Lazy function (generator) to read a file piece by piece.
    Default chunk size: 1k."""
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open('really_big_file.dat') as f:
    for piece in read_in_chunks(f):
        process_data(piece)

另一种选择是使用iter和帮助函数:

f = open('really_big_file.dat')
def read1k():
    return f.read(1024)

for piece in iter(read1k, ''):
    process_data(piece)

如果文件是基于行的,那么文件对象已经是一组懒加载的行生成器:
for line in open('really_big_file.dat'):
    process_data(line)

11
为了与使用不支持Posix的Windows系统的同事兼容,最好使用open('really_big_file.dat', 'rb')来处理大型文件。 - Tal Weiss
11
如@Tal Weiss所提到的,缺少rb参数;同时也缺少一个file.close()语句(可以使用with open('really_big_file.dat', 'rb') as f:实现同样的效果);点击这里获取另一个简洁实现的代码 - cod3monk3y
8
@cod3monk3y说,文本文件和二进制文件是不同的东西。两种类型都有用,但用途不同。在这里,默认的(文本)模式可能很有用,也就是说,'rb'不是缺失的。 - jfs
3
@j-f-sebastian说得对,原帖没有明确指出读取的是文本数据还是二进制数据。但如果他在Windows上使用Python 2.7并且正在读取二进制数据,那么值得注意的是,如果他忘记使用'b'参数,他的数据将很可能被破坏。根据官方文档的说明——"在Windows上,Python区分文本文件和二进制文件; [...] 它会破坏像JPEG或EXE文件中的二进制数据。 在读写这些文件时一定要非常小心,使用二进制模式。" - cod3monk3y
3
这里有一个返回1k块数据的生成器:buf_iter = (x for x in iter(lambda: buf.read(1024), ''))。然后使用for chunk in buf_iter:循环遍历这些数据块。 - berto
显示剩余22条评论

48

file.readlines()方法接受一个可选的size参数,它大概估算出读取的行数。

bigfile = open('bigfilename','r')
tmp_lines = bigfile.readlines(BUF_SIZE)
while tmp_lines:
    process([line for line in tmp_lines])
    tmp_lines = bigfile.readlines(BUF_SIZE)

1
这是一个非常棒的想法,特别是当它与defaultdict结合使用时,可以将大数据拆分成较小的数据。 - Frank Wang
9
我建议使用.read()而不是.readlines()。如果文件是二进制的,它将没有换行符。 - Myers Carpenter
1
如果文件是一个巨大的字符串,该怎么办? - MattSom
1
这个解决方案有漏洞。如果其中一行大于您的BUF_SIZE,您将会处理不完整的行。@MattSom 是正确的。 - user48678
@MyersCarpenter 那一行会被重复两次吗? tmp_lines = bigfile.readlines(BUF_SIZE) - user1
混乱的解决方案。你将文件分成 BUF_SIZE 块,然后不必要地将 这些 BUF_SIZE 块拆分为一个 list。为什么不直接使用 file.readline(BUF_SIZE) 呢?(显然,那也很丑陋——只是不会那么丑陋...) - JamesTheAwesomeDude

46

已经有很多好的回答了,但如果您的整个文件只有一行,并且仍然想处理“行”(而不是固定大小的块),这些答案将无法帮助您。

99% 的时间,可以逐行处理文件。然后,如此回答中建议的那样,您可以将文件对象本身用作延迟生成器:

with open('big.csv') as f:
    for line in f:
        process(line)

然而,当行分隔符不是'\n'时(一个常见的例子是'|'),可能会遇到非常大的文件。

  • 在处理之前将'|'转换为'\n'可能不是一个选项,因为它可能会搞乱那些可能包含'\n'的字段(例如:自由文本用户输入)。
  • 使用csv库也不可行,因为至少在该库的早期版本中,它被硬编码为逐行读取输入

对于这种情况,我创建了以下代码片段[更新于2021年5月,适用于Python 3.8+]:

def rows(f, chunksize=1024, sep='|'):
    """
    Read a file where the row separator is '|' lazily.

    Usage:

    >>> with open('big.csv') as f:
    >>>     for r in rows(f):
    >>>         process(r)
    """
    row = ''
    while (chunk := f.read(chunksize)) != '':   # End of file
        while (i := chunk.find(sep)) != -1:     # No separator found
            yield row + chunk[:i]
            chunk = chunk[i+1:]
            row = ''
        row += chunk
    yield row

[对于旧版本的Python]:

def rows(f, chunksize=1024, sep='|'):
    """
    Read a file where the row separator is '|' lazily.

    Usage:

    >>> with open('big.csv') as f:
    >>>     for r in rows(f):
    >>>         process(r)
    """
    curr_row = ''
    while True:
        chunk = f.read(chunksize)
        if chunk == '': # End of file
            yield curr_row
            break
        while True:
            i = chunk.find(sep)
            if i == -1:
                break
            yield curr_row + chunk[:i]
            curr_row = ''
            chunk = chunk[i+1:]
        curr_row += chunk

我能够成功地使用它来解决各种问题。它经过广泛测试,具有不同的块大小。以下是我使用的测试套件,供需要自行验证的人使用:

test_file = 'test_file'

def cleanup(func):
    def wrapper(*args, **kwargs):
        func(*args, **kwargs)
        os.unlink(test_file)
    return wrapper

@cleanup
def test_empty(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1_char_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_1_char(chunksize=1024):
    with open(test_file, 'w') as f:
        f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1025_chars_1_row(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1025):
            f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1

@cleanup
def test_1024_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1023):
            f.write('a')
        f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_1025_chars_1026_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1025):
            f.write('|')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 1026

@cleanup
def test_2048_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1022):
            f.write('a')
        f.write('|')
        f.write('a')
        # -- end of 1st chunk --
        for i in range(1024):
            f.write('a')
        # -- end of 2nd chunk
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

@cleanup
def test_2049_chars_2_rows(chunksize=1024):
    with open(test_file, 'w') as f:
        for i in range(1022):
            f.write('a')
        f.write('|')
        f.write('a')
        # -- end of 1st chunk --
        for i in range(1024):
            f.write('a')
        # -- end of 2nd chunk
        f.write('a')
    with open(test_file) as f:
        assert len(list(rows(f, chunksize=chunksize))) == 2

if __name__ == '__main__':
    for chunksize in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024]:
        test_empty(chunksize)
        test_1_char_2_rows(chunksize)
        test_1_char(chunksize)
        test_1025_chars_1_row(chunksize)
        test_1024_chars_2_rows(chunksize)
        test_1025_chars_1026_rows(chunksize)
        test_2048_chars_2_rows(chunksize)
        test_2049_chars_2_rows(chunksize)

41
如果您的计算机、操作系统和Python都是64位的,则可以使用mmap模块将文件内容映射到内存中,并使用索引和切片访问它。这是文档中的一个示例:
import mmap
with open("hello.txt", "r+") as f:
    # memory-map the file, size 0 means whole file
    map = mmap.mmap(f.fileno(), 0)
    # read content via standard file methods
    print map.readline()  # prints "Hello Python!"
    # read content via slice notation
    print map[:5]  # prints "Hello"
    # update content using slice notation;
    # note that new content must have same size
    map[6:] = " world!\n"
    # ... and read again using standard file methods
    map.seek(0)
    print map.readline()  # prints "Hello  world!"
    # close the map
    map.close()

如果你的电脑、操作系统或Python是32位的,那么mmap映射大文件可能会占用大量地址空间,导致程序内存不足。请注意保留""和""以及HTML标签。

9
这应该如何工作?如果我有一个32GB的文件怎么办?如果我在只有256MB内存的VM上怎么办?映射这样一个巨大的文件真的不是一个好主意。 - Savino Sguera
5
这个回答应该得到负十二票。这会让使用它处理大文件的人死掉。 - Phyo Arkar Lwin
28
即使是处理大文件,这个方法也可以适用于64位Python。尽管该文件被映射到内存中,但它并没有被读入内存,因此物理内存的使用量可以比文件大小小得多。 - pts
1
@SavinoSguera,使用mmap映射文件时,物理内存的大小是否很重要? - Nick T
18
我尝试在64位Python上使用mmap映射32GB的文件。它可以正常工作(我的RAM小于32GB):我可以使用序列和文件接口访问文件的开头、中间和结尾。 - jfs
不认为这对于大文件非常实用,而且由于数据结构的不同,你可能会在寻找正确的项目以检索时迷失方向。这只是我的直觉感觉。 - Andrea Moro

14
在 Python 3.8+ 中,您可以在 while 循环中使用 .read()
with open("somefile.txt") as f:
    while chunk := f.read(8192):
        do_something(chunk)

当然,你可以使用任何块大小,不必使用8192(2 ** 13)字节。除非文件大小恰好是块大小的倍数,否则最后一个块将小于块大小。

你有任何想法为什么你的代码出现了这个错误吗:回溯(最近的调用最先): 文件“C:\ Users \ DKMK01256 \ AppData \ Local \ Programs \ Python \ Python310 \ lib \ tkinter \ __ init __。py”,第1921行,在__call__中, 返回self.func(* args) 文件“C:\ Users \ DKMK01256 \ OneDrive - WSP O365 \ Python \ coordinatesystem changer gui.py”,第66行,在转换中 while data := f.readlines(): ValueError:在关闭的文件上进行了I / O操作。 - Mathias
1
while data := f.readlines()与您正在评论的答案中的代码非常不同,它使用了不同的函数。 readlines()会读取整个文件,因此在while循环中多次执行它几乎肯定是一个错误。 - user3064538

14
f = ... # file-like object, i.e. supporting read(size) function and 
        # returning empty string '' when there is nothing to read

def chunked(file, chunk_size):
    return iter(lambda: file.read(chunk_size), '')

for data in chunked(f, 65536):
    # process the data

更新:最佳解释请参见https://dev59.com/oG455IYBdhLWcg3wD_sB#4566523


1
这对于二进制大对象(blobs)效果很好,但对于分行内容(如CSV、HTML等需要逐行处理的内容)可能不太适用。 - cgseller
请问,f的值是多少? - user1
@user1,可以使用open('文件名')。 - myroslav

7

请参考Python官方文档https://docs.python.org/3/library/functions.html#iter

也许这种方法更符合Python风格:

"""A file object returned by open() is a iterator with
read method which could specify current read's block size
"""
with open('mydata.db', 'r') as f_in:
    block_read = partial(f_in.read, 1024 * 1024)
    block_iterator = iter(block_read, '')

    for index, block in enumerate(block_iterator, start=1):
        block = process_block(block)  # process your block data

        with open(f'{index}.txt', 'w') as f_out:
            f_out.write(block)

Bruce是正确的。我使用functools.partial来解析视频流。使用py;py3,我可以每秒解析超过1GB的数据。for pkt in iter(partial(vid.read, PACKET_SIZE ), b""): - Leroy Scandal

5
我认为我们可以这样写:
def read_file(path, block_size=1024): 
    with open(path, 'rb') as f: 
        while True: 
            piece = f.read(block_size) 
            if piece: 
                yield piece 
            else: 
                return

for piece in read_file(path):
    process_piece(piece)

2

由于我的声望较低,我无法发表评论,但SilentGhosts的解决方案应该更容易实现,使用file.readlines([sizehint])即可。

Python文件方法

编辑:SilentGhost是正确的,但这比以下方法更好:

s = "" 
for i in xrange(100): 
   s += file.next()

好的,抱歉,您绝对是正确的。但也许这个解决方案会让您更加开心 ;)s = "" for i in xrange(100): s += file.next() - sinzi
3
糟糕的解决方案,这意味着每行都需要在内存中创建一个新的字符串,并将读取的整个文件数据复制到新字符串中。性能和内存使用都非常差。 - nosklo
为什么要将整个文件数据复制到一个新字符串中?来自Python文档: 为了使for循环成为在文件的行上循环的最有效方法(这是非常常见的操作),next()方法使用了一个隐藏的预读缓冲区。 - sinzi
3
@sinsi说:"s +="或者字符串拼接每次都会创建一个新的字符串副本,因为字符串是不可变的。所以你每次都在创建一个新的字符串。" - nosklo
1
@nosklo:这些是实现的细节,可以使用列表推导式来代替。 - SilentGhost

1

我处于相似的情况。不清楚你知道以字节为单位的块大小;我通常不知道,但需要的记录数(行数)已知:

def get_line():
     with open('4gb_file') as file:
         for i in file:
             yield i

lines_required = 100
gen = get_line()
chunk = [i for i, j in zip(gen, range(lines_required))]

更新:感谢nosklo。这就是我的意思。它几乎可以工作,除了在“块”之间丢失了一行。

chunk = [next(gen) for i in range(lines_required)]

这个方法可以不丢失任何行,但是看起来不是很好看。


1
这是伪代码吗?它不会起作用。而且也没有必要让人困惑,你应该将行数作为 get_line 函数的可选参数。 - nosklo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接