如何逐行读取大文件?

600

我想要迭代整个文件的每一行。其中一种方法是读取整个文件,将其保存到列表中,然后遍历感兴趣的行。但这种方法会使用大量内存,所以我正在寻找另一种方法。

目前我的代码:

for each_line in fileinput.input(input_file):
    do_something(each_line)

    for each_line_again in fileinput.input(input_file):
        do_something(each_line_again)

执行此代码会出现错误信息:设备已激活

有什么建议吗?

目的是计算逐对字符串相似度,即对于文件中的每一行,我想计算其与所有其他行之间的Levenshtein距离。

2022年11月编辑:在此问题提出8个月后,有一个相关问题有许多有用的答案和评论。为了更深入地了解Python逻辑,请阅读这个相关问题如何逐行读取Python文件?


4
为什么需要针对每一行重新读取整个文件?如果您告诉别人您想要实现的目标,也许有人可以建议更好的方法。 - JJJ
如果文件太大而无法将其先读入内存,那么没有任何O(N^2)算法(即考虑每对行)是实用的。可以尝试避免将整个文件存储在内存中并迭代每对行,但要么仍需要使用O(N)内存来记住行开头的位置,要么涉及一些冗余的读取。 - Karl Knechtel
1
(然后还有一个问题,那就是你要对这些数据做什么...) - Karl Knechtel
如果您需要处理二进制文件,请参阅如何迭代遍历二进制文件的惯用方式? - Karl Knechtel
11个回答

1375

正确的、完全符合Python语言特性的读取文件方式如下:

with open(...) as f:
    for line in f:
        # Do something with 'line'

with语句负责打开和关闭文件,包括在内部块中引发异常的情况。for line in f将文件对象f视为可迭代对象,自动使用缓冲I/O和内存管理,因此您不必担心大文件。

做一件事应该有且最好只有一种明显的方法。


16
没错,这是适用于 Python 2.6 及以上版本的最佳版本。 - Simon Bergot
4
我个人更喜欢使用生成器和协程来处理数据流水线。 - jldupont
4
如果一个文件是一个巨大的文本文件,但只有一行,并且想要处理单词,最好的策略是什么? - mfcabrera
4
能否解释一下 for line in f: 是如何工作的?我的意思是,如何迭代文件对象? - haccks
16
如果您在一个对象上进行迭代,Python会查找对象方法列表中的一个特殊方法,名为__iter__,这个方法告诉Python该如何进行迭代操作。文件对象定义了这个特殊方法,它返回一个可遍历文件行的迭代器。(粗略地说。) - Katriel
显示剩余14条评论

158

两种内存高效的方式,按优先级排序(第一种最好) -

  1. 使用 with - 支持 Python 2.5 及以上版本
  2. 如果你真的想控制读取量,可以使用 yield

1. 使用 with

with 是读取大文件时优雅且高效的 Pythonic 方式。优点 - 1)离开 with 执行块后,文件对象会自动关闭。2)在 with 块内进行异常处理。3)内存友好,for 循环逐行迭代文件对象 f。它内部进行缓存 IO(优化昂贵的 IO 操作)和内存管理。

with open("x.txt") as f:
    for line in f:
        do something with data

2. 使用 yield

有时候,你可能想要更细致地控制每个迭代中读取的量。这时可以使用 iteryield。需要注意的是,使用这种方法时需要在最后显式地关闭文件。

def readInChunks(fileObj, chunkSize=2048):
    """
    Lazy function to read a file piece by piece.
    Default chunk size: 2kB.

    """
    while True:
        data = fileObj.read(chunkSize)
        if not data:
            break
        yield data

f = open('bigFile')
for chunk in readInChunks(f):
    do_something(chunk)
f.close()

陷阱和为了完整性 - 下面的方法并不是读取大文件的最佳或最优雅的方式,但请阅读以获得全面的理解。

在Python中,从文件读取行的最常见方法是执行以下操作:

for line in open('myfile','r').readlines():
    do_something(line)

然而,当使用readlines()函数(对于read()函数同样适用)时,会将整个文件加载到内存中,然后进行迭代。对于大文件来说,略微更好的方法是使用fileinput模块,如下所示:

import fileinput

for line in fileinput.input(['myfile']):
    do_something(line)

fileinput.input()调用按顺序读取行,但在读取后不会将它们保存在内存中,甚至只是因为Python中的file是可迭代的。

参考资料

  1. Python中的with语句

11
这样做for line in open(...).readlines(): <do stuff>基本上从来不是一个好主意。你为什么要这么做呢?你刚刚失去了Python聪明的缓冲迭代IO的所有优势,而没有得到任何好处。 - Katriel
5
@Srikar:在解决问题时,有时需要提供所有可能的解决方案,但教初学者如何进行文件输入并不是合适的时间和场合。在一个充满错误答案的长篇帖子底部埋藏正确答案并不能构成好的教学。 - Katriel
6
@Srikar:如果你把正确的方法放在开头,然后提到readlines并解释为什么不好(因为它会将整个文件读入内存),再解释一下fileinput模块是什么以及为什么你可能想使用它而不是其他方法,然后解释如何对文件进行分块以改善IO性能,并给出一个分块函数的例子(但要提到Python已经为你做了这个所以你不需要),你的文章将会显著地变得更好。但仅仅是给出五种解决简单问题的方法,其中四种在这种情况下是错误的,这是不好的。 - Katriel
2
无论你为了完整性而添加什么内容,都要将其放在最后,而不是最前面。首先展示正确的方式。 - m000
8
@katrielalex重新审视了我的答案,发现需要进行改写。我能明白之前的回答可能会引起混淆。希望这个改动能让未来的用户更加清晰地理解。 - Srikar Appalaraju
显示剩余2条评论

39

去掉换行符:

with open(file_path, 'rU') as f:
    for line_terminated in f:
        line = line_terminated.rstrip('\n')
        ...

使用通用换行符支持后,所有文本文件的行都似乎以'\n'结尾,无论文件中的终止符是'\r''\n'还是'\r\n'

编辑 -指定通用换行符支持:

  • Unix上的Python 2- open(file_path, mode='rU') - 必需 [感谢@Dave]
  • Windows上的Python 2- open(file_path, mode='rU') - 可选
  • Python 3- open(file_path, newline=None) - 可选

newline参数仅在Python 3中受支持,并默认为None。在所有情况下,mode参数默认为'r'。在Python 3中,U已被弃用。在Windows上的Python 2中,似乎有其他机制将\r\n转换为\n

文档:

要保留原生行终止符:

with open(file_path, 'rb') as f:
    with line_native_terminated in f:
        ...

二进制模式仍可以使用in将文件解析为行。每一行将包含文件中的任何终止符。

感谢@katrielalex答案,Python的open()文档和iPython实验。


1
在Python 2.7中,我必须使用open(file_path, 'rU')来启用通用换行符。 - Dave

18

这是Python中读取文件的一种可能方式:

f = open(input_file)
for line in f:
    do_stuff(line)
f.close()

它不会分配完整的列表。它遍历每一行。


3
虽然这个方法可行,但绝不是规范的方式。规范的方式是使用上下文包装器,例如 with open(input_file) as f:。这样可以避免使用 f.close() 并确保您不会意外忘记关闭文件,从而防止内存泄漏,在读取文件时非常重要。 - Mast
1
正如@Mast所说,那不是规范的方式,因此要对其进行负投票。 - azuax

13

介绍一下我的背景,我更喜欢使用像H2O这样的开源工具进行超高性能并行CSV文件读取,但该工具在功能集方面受到限制。因此,在将数据馈送到H2O集群进行监督学习之前,我需要编写大量代码来创建数据科学管道。

对于数据科学目的而言,我使用“multiprocessing”库的池对象和映射函数添加了大量并行处理,从而明显提高了读取8GB HIGGS数据集(来自UCI数据仓库)甚至40GB CSV文件的速度。例如,使用最近邻搜索、DBSCAN和马尔可夫聚类算法进行聚类需要一些并行编程技巧才能避免一些严峻的内存和墙钟时间问题。

通常情况下,我会先使用GNU工具将文件按行分成部分,然后使用glob-filemask在Python程序中查找并并行读取它们。我通常使用超过1000个部分文件。通过这些技巧,可以极大地提高处理速度和内存限制。

pandas dataframe.read_csv是单线程的,因此您可以使用map()进行并行执行以使pandas更快。使用htop可以看到,在顺序进行pandas dataframe.read_csv时,100% CPU占用率只是一个核心中的实际瓶颈,而根本不是磁盘。

我还应该指出,我正在使用SSD和快速视频卡总线,并非SATA6总线上的旋转硬盘,同时具有16个CPU核心。

另外,在某些应用程序中,我发现另一种技术也可以很好地运行,即在一个巨大文件内执行并行CSV文件读取,让每个工作进程从不同的偏移量开始,而不是将一个大文件预先分成多个部分文件。在每个并行工作进程中使用Python的file seek()和tell()读取大型文本文件中的条带,使用不同的字节偏移量开始和结束字节位置,并同时进行处理。您可以对字节进行正则表达式查找,并返回行尾符号的数量。这是一个局部小计。最后,将局部小计相加以在工作完成后映射函数返回时获取全局小计。

下面是使用并行字节偏移技巧的一些示例基准测试:

我使用2个文件:HIGGS.csv为8GB。它来自UCI机器学习存储库。all_bin .csv为40.4 GB,来自我的当前项目。我使用2个程序:Linux中自带的GNU wc程序和我开发的纯Python fastread.py程序。

HP-Z820:/mnt/fastssd/fast_file_reader$ ls -l /mnt/fastssd/nzv/HIGGS.csv
-rw-rw-r-- 1 8035497980 Jan 24 16:00 /mnt/fastssd/nzv/HIGGS.csv

HP-Z820:/mnt/fastssd$ ls -l all_bin.csv
-rw-rw-r-- 1 40412077758 Feb  2 09:00 all_bin.csv

ga@ga-HP-Z820:/mnt/fastssd$ time python fastread.py --fileName="all_bin.csv" --numProcesses=32 --balanceFactor=2
2367496

real    0m8.920s
user    1m30.056s
sys 2m38.744s

In [1]: 40412077758. / 8.92
Out[1]: 4530501990.807175

那是大约4.5GB/s或45 Gb/s的文件读取速度。这可不是普通的硬盘,我的朋友。实际上,这是一个Samsung Pro 950固态硬盘。

下面是相同文件被gnu wc逐行计数的速度基准测试结果,这是一个纯C编译程序。

有趣的是,你可以看到我的纯Python程序在这种情况下基本上与gnu wc编译的C程序的速度相匹配。Python是解释性语言,而C是编译性语言,所以我认为这是一项相当有趣的速度成果。当然,wc真的需要改成并行程序,那么它就能轻松地击败我的python程序了。但就目前而言,gnu wc只是一个顺序执行程序。你知道自己能做什么,而Python今天可以并行。Cython编译可能会对我有所帮助(时间另议)。此外,内存映射文件还没有被探索过。

HP-Z820:/mnt/fastssd$ time wc -l all_bin.csv
2367496 all_bin.csv

real    0m8.807s
user    0m1.168s
sys 0m7.636s


HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000

real    0m2.257s
user    0m12.088s
sys 0m20.512s

HP-Z820:/mnt/fastssd/fast_file_reader$ time wc -l HIGGS.csv
11000000 HIGGS.csv

real    0m1.820s
user    0m0.364s
sys 0m1.456s

结论:与C程序相比,这个纯Python程序的速度很快。然而,在行计数的目的下,仍然不足以使用纯Python程序代替C程序。通常,该技术可用于其他文件处理,因此这个Python代码仍然很好。

问题:编译正则表达式一次并将其传递给所有工作进程是否会提高速度?答案:在这个应用程序中,正则表达式预编译并不能提高性能。我认为原因是所有工作进程的进程序列化和创建的开销占主导。

还有一件事情。并行CSV文件读取真的有帮助吗?瓶颈是磁盘还是CPU?许多在stackoverflow上所谓的最佳答案包含了一个普遍的开发人员智慧,即你只需要一个线程来读取文件,他们说的最好。但他们确定吗?让我们找出答案:

HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000

real    0m2.256s
user    0m10.696s
sys 0m19.952s

HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=1
11000000

real    0m17.380s
user    0m11.124s
sys 0m6.272s

没错,它确实可以。并行文件读取效果很好。就是这样!

附:如果您想知道,如果在单个工作进程中使用balanceFactor为2会怎样?好吧,情况非常糟糕:

HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=2
11000000

real    1m37.077s
user    0m12.432s
sys 1m24.700s

快速阅读.py Python程序的关键部分:

fileBytes = stat(fileName).st_size  # Read quickly from OS how many bytes are in a text file
startByte, endByte = PartitionDataToWorkers(workers=numProcesses, items=fileBytes, balanceFactor=balanceFactor)
p = Pool(numProcesses)
partialSum = p.starmap(ReadFileSegment, zip(startByte, endByte, repeat(fileName))) # startByte is already a list. fileName is made into a same-length list of duplicates values.
globalSum = sum(partialSum)
print(globalSum)


def ReadFileSegment(startByte, endByte, fileName, searchChar='\n'):  # counts number of searchChar appearing in the byte range
    with open(fileName, 'r') as f:
        f.seek(startByte-1)  # seek is initially at byte 0 and then moves forward the specified amount, so seek(5) points at the 6th byte.
        bytes = f.read(endByte - startByte + 1)
        cnt = len(re.findall(searchChar, bytes)) # findall with implicit compiling runs just as fast here as re.compile once + re.finditer many times.
    return cnt
< p >PartitionDataToWorkers的定义只是普通的顺序代码。我刻意省略了它,以便其他人可以练习并了解并行编程的工作方式。更难的部分是我免费提供的:经过测试和工作正常的并行代码,有助于你的学习。< /p > < p >感谢开源H2O项目,由Arno、Cliff和H2O员工开发的优秀软件和教学视频,为上述纯Python高性能并行字节偏移量读取器提供了灵感。H2O使用Java进行并行文件读取,可由Python和R程序调用,并且速度非常快,在读取大型CSV文件方面比任何其他软件都要快。< /p >

这基本上就是并行块。此外,我预计SSD和Flash是唯一与此技术兼容的存储设备。旋转硬盘不太可能兼容。 - Geoffrey Anderson
1
你是如何考虑操作系统缓存磁盘文件的? - JamesThomasMoon

5

Katrielalex提供了打开和读取文件的方法。

然而,您的算法会为文件的每一行读取整个文件。这意味着读取文件 - 并计算Levenshtein距离 - 的总量将是N*N,其中N是文件中的行数。由于您关心文件大小并且不想将其存储在内存中,我担心结果将产生二次运行时间。您的算法属于O(n ^ 2)类算法,通常可以通过专业化来改进。

我怀疑您已经知道内存与运行时间之间的权衡,但也许您想调查是否存在一种有效的方法来并行计算多个Levenshtein距离。如果有的话,分享您的解决方案会很有趣。

您的文件有多少行,在哪种机器(mem&amp; cpu power)上运行您的算法,以及容忍的运行时间是多少?

代码如下:

with f_outer as open(input_file, 'r'):
    for line_outer in f_outer:
        with f_inner as open(input_file, 'r'):
            for line_inner in f_inner:
                compute_distance(line_outer, line_inner)

但问题是如何存储距离(矩阵?),并且是否可以通过准备外线进行处理或缓存一些中间结果以供重用来获得优势。


我的观点是,这篇帖子没有回答问题,只是提出了更多的问题!我认为将其作为评论会更合适。 - Katriel
1
@katriealex:奇怪,您有看到嵌套循环吗?把您自己的回答扩展一下以适应实际问题?我可以从我的答案中删除这里的问题,并且仍有足够的内容来提供这个-尽管是部分-答案。如果您编辑自己的答案以包括嵌套循环示例(该示例由问题明确要求),然后我可以高兴地删除自己的答案并接受您的答案。但是我完全不理解为什么会被投票反对。 - cfi
好的,我并不认为展示嵌套的for循环是回答问题的方法,但我猜这对初学者来说非常有针对性。取消踩赞。 - Katriel

3

需要经常从上次读取的位置读取大文件吗?

我创建了一个脚本,用于每天多次切割一个 Apache access.log 文件。 因此,我需要在上一次执行时解析的最后一行上设置一个位置光标。 为此,我使用了file.seek()file.tell()方法,它们允许将光标存储在文件中。

我的代码:

ENCODING = "utf8"
CURRENT_FILE_DIR = os.path.dirname(os.path.abspath(__file__))

# This file is used to store the last cursor position
cursor_position = os.path.join(CURRENT_FILE_DIR, "access_cursor_position.log")

# Log file with new lines
log_file_to_cut = os.path.join(CURRENT_FILE_DIR, "access.log")
cut_file = os.path.join(CURRENT_FILE_DIR, "cut_access", "cut.log")

# Set in from_line 
from_position = 0
try:
    with open(cursor_position, "r", encoding=ENCODING) as f:
        from_position = int(f.read())
except Exception as e:
    pass

# We read log_file_to_cut to put new lines in cut_file
with open(log_file_to_cut, "r", encoding=ENCODING) as f:
    with open(cut_file, "w", encoding=ENCODING) as fw:
        # We set cursor to the last position used (during last run of script)
        f.seek(from_position)
        for line in f:
            fw.write("%s" % (line))

    # We save the last position of cursor for next usage
    with open(cursor_position, "w", encoding=ENCODING) as fw:
        fw.write(str(f.tell()))

2
#Using a text file for the example
with open("yourFile.txt","r") as f:
    text = f.readlines()
for line in text:
    print line
  • 打开文件以供读取(r)
  • 读取整个文件并将每行保存到列表中(text)
  • 循环遍历列表,打印每一行。

如果您想检查特定行的长度是否大于10,则可以使用已有的内容进行操作。

for line in text:
    if len(line) > 10:
        print line

1
虽然这段代码不是最适合这个问题的,但它主要用于一次性读取整个文件("slurping")。这正是我所需要的,而且谷歌把我带到了这里。点赞!此外,如果您在循环中进行耗时处理,为了保证原子性或提高效率,一次性读取整个文件可能更快。 - ntg
1
另外,稍微改进了一下代码:1. 在使用with关键字后不需要再使用close语句:(https://docs.python.org/2/tutorial/inputoutput.html,请搜索“使用with关键字是一个好习惯...”)2. 文件读取后可以在外部处理文本(在with循环之外...) - ntg

2

来自fileinput的Python文档。input():

这会迭代所有列在sys.argv[1:]中的文件的行,默认情况下,如果列表为空,则使用sys.stdin

此外,该函数的定义如下:

fileinput.FileInput([files[, inplace[, backup[, mode[, openhook]]]]])

阅读行间内容,这告诉我files可以是一个列表,因此您可以拥有以下内容:
for each_line in fileinput.input([input_file, input_file]):
  do_something(each_line)

更多信息请参见此处


2
我强烈建议不要使用默认的文件加载,因为它非常慢。你应该了解numpy函数和IOpro函数(例如:numpy.loadtxt())。

http://docs.scipy.org/doc/numpy/user/basics.io.genfromtxt.html

https://store.continuum.io/cshop/iopro/

然后,您可以将成对操作分成块:
import numpy as np
import math

lines_total = n    
similarity = np.zeros(n,n)
lines_per_chunk = m
n_chunks = math.ceil(float(n)/m)
for i in xrange(n_chunks):
    for j in xrange(n_chunks):
        chunk_i = (function of your choice to read lines i*lines_per_chunk to (i+1)*lines_per_chunk)
        chunk_j = (function of your choice to read lines j*lines_per_chunk to (j+1)*lines_per_chunk)
        similarity[i*lines_per_chunk:(i+1)*lines_per_chunk,
                   j*lines_per_chunk:(j+1)*lines_per_chunk] = fast_operation(chunk_i, chunk_j) 

将数据分块加载,然后进行矩阵运算通常比逐个元素操作更快!!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接