如何逐行读取大型文本文件,而不将其加载到内存中?

331

我想逐行读取一个大文件(>5GB),而不需要将其整个内容加载到内存中。由于readlines()会在内存中创建一个非常大的列表,因此我无法使用它。


如果您需要处理二进制文件,请参阅如何迭代遍历二进制文件的惯用方式? - Karl Knechtel
14个回答

439

使用 for 循环读取文件对象的每一行。使用 with open(...) 来让一个上下文管理器确保在读取后关闭文件:

with open("log.txt") as infile:
    for line in infile:
        print(line)

49
问题仍然是,“for line in infile”会将我的5GB行加载到内存中吗?另外,如何从尾部读取? - Bruno Rocha - rochacbruno
106
@rochacbruno,它一次只能读取一行内容。当读取下一行时,上一行将被自动垃圾回收,除非您已经在其他地方存储了对其的引用。 - John La Rooy
1
@rochacbruno,倒序读取行并不容易高效地实现。通常情况下,您会希望以合理大小的块(例如千字节到兆字节)从文件末尾开始读取,并在换行符(或者您平台上的任何行结束字符)处进行分割。 - John La Rooy
4
谢谢!我找到了一个解决方案,链接在这里:https://dev59.com/OG025IYBdhLWcg3wnncw#5896210。 - Bruno Rocha - rochacbruno
4
@bawejakunal,您的意思是如果一行太长而无法一次加载到内存中吗?这对于文本文件来说是不寻常的。您可以使用chunk = infile.read(chunksize)读取有限大小的块,而不管它们的内容,而不是使用迭代行的for循环。您将不得不自己在块内搜索换行符。 - John La Rooy
显示剩余4条评论

80

您所需要做的就是将文件对象用作迭代器。

for line in open("log.txt"):
    do_something_with(line)

在最近的Python版本中,使用上下文管理器会更好。

with open("log.txt") as fileobject:
    for line in fileobject:
        do_something_with(line)

这也会自动关闭文件。


4
这不是把整个文件加载到内存中吗? - Bruno Rocha - rochacbruno
在第一个示例中,循环后难道不应该关闭文件吗? - maciejwww
1
@maciejwww 是的,但我不想让它看起来更像 OP 的例子。第二个例子使用 with 语句是一个“上下文管理器”,它会自动关闭文件对象。 - Keith

21
请尝试这个:
with open('filename','r',buffering=100000) as f:
    for line in f:
        print line

8
来自Python官方文档:链接可选参数buffering指定文件所需的缓冲区大小:0表示无缓冲,1表示行缓冲,任何其他正值表示使用大约该大小(以字节为单位)的缓冲区。负数缓冲意味着使用系统默认值,对于TTY设备通常是行缓冲,对于其他文件则是完全缓冲。如果省略,则使用系统默认值。 - jyoti das
@jyotidas 虽然我喜欢这种方法,但你的文本可能会被分成块,存在一定风险。我亲身经历过这种情况,如果你像我一样在文件中搜索字符串,可能会错过一些内容,因为它们所在的行被分成了块。有没有什么方法可以避免这种情况?使用readlines也不太好,因为我数错了。 - edo101

20

最好使用迭代器。


相关文档:fileinput — 从多个输入流中迭代读取行

来自文档:

import fileinput
for line in fileinput.input("filename", encoding="utf-8"):
    process(line)

这样做将避免一次性将整个文件复制到内存中。


虽然文档显示代码片段为“典型用法”,但使用它时并不会在循环结束时调用返回的FileInput类对象的close()方法,因此我建议避免以这种方式使用它。在Python 3.2中,他们终于使fileinput与上下文管理器协议兼容,解决了这个问题(但代码仍不能完全按照所示方式编写)。 - martineau

19

一种老派的方法:

fh = open(file_name, 'rt')
line = fh.readline()
while line:
    # do stuff with line
    line = fh.readline()
fh.close()

2
小注: 为了异常安全,建议使用 'with' 语句,在您的情况下为 "with open(filename, 'rt') as fh:" - prokher
23
是的,但我称之为“老派”。 - PTBNL

15

如果文件中没有换行符,以下是您需要执行的操作:

with open('large_text.txt') as f:
  while True:
    c = f.read(1024)
    if not c:
      break
    print(c,end='')

虽然我喜欢这种方法,但你运行的风险是文本中的行会被分成块。我个人见过这种情况,这意味着如果你像我一样在文件中搜索字符串,我可能会错过其中一些,因为它们所在的行已经被分成了块。有没有什么办法可以解决这个问题?尝试使用readlines也没有成功,因为我统计错误了。@Ariel Cabib - edo101
你也可以使用sys.stdout.write(c)代替print()。 - sivann

7

我简直不敢相信这件事情可以像@john-la-rooy的回答所说的那样容易。因此,我使用逐行读写的方式重新创建了cp命令。它非常快速。

#!/usr/bin/env python3.6

import sys

with open(sys.argv[2], 'w') as outfile:
    with open(sys.argv[1]) as infile:
        for line in infile:
            outfile.write(line)

4
注意:由于Python的readline标准化了行结束符,这会将具有DOS行结束符\r\n的文档转换为Unix行结束符\n。我搜索这个主题的原因是我需要转换一个日志文件,该文件接收了各种.NET库产生的混乱行结束符。令我惊讶的是,在我的初始速度测试之后,我不需要返回并使用rstrip来去除行末空白(\r\n)。它已经完美了! - Bruno Bronosky

5

blaze项目在过去的6年中已经取得了很大的进展。它有一个简单的API,涵盖了pandas功能的一个有用子集。

dask.dataframe在内部处理分块,支持许多可并行化的操作,并允许您轻松地将切片导出回pandas以进行内存操作。

import dask.dataframe as dd

df = dd.read_csv('filename.csv')
df.head(10)  # return first 10 rows
df.tail(10)  # return last 10 rows

# iterate rows
for idx, row in df.iterrows():
    ...

# group by my_field and return mean
df.groupby(df.my_field).value.mean().compute()

# slice by column
df[df.my_field=='XYZ'].compute()

2
这是加载任意大小文本文件的代码,可以避免内存问题。 它支持千兆字节级别的文件。

https://gist.github.com/iyvinjose/e6c1cb2821abd5f01fd1b9065cbc759d

下载文件data_loading_utils.py,并将其导入您的代码中。
用法
import data_loading_utils.py.py
file_name = 'file_name.ext'
CHUNK_SIZE = 1000000


def process_lines(data, eof, file_name):

    # check if end of file reached
    if not eof:
         # process data, data is one single line of the file

    else:
         # end of file reached

data_loading_utils.read_lines_from_file_as_data_chunks(file_name, chunk_size=CHUNK_SIZE, callback=self.process_lines)

"process_lines" 方法是回调函数。它将被用于所有行,参数 data 代表每次文件中的单个行。
您可以根据计算机硬件配置设置变量 "CHUNK_SIZE"。

1
虽然我喜欢这种方法,但你有可能会冒着文本行被分成块的风险。我亲身经历过这种情况,这意味着如果你像我一样在文件中搜索字符串,那么你可能会错过一些字符串,因为它们所在的行被分成了块。有没有什么办法可以解决这个问题?使用readlines方法并不好用,因为我得到了错误的计数。 - edo101

1
我知道这个问题早就得到了解答,但是以下方法可以并行处理,而且不会增加内存开销(如果你尝试将每行数据放入线程池中,则会增加内存开销)。当然,将readJSON_line2函数替换为有意义的内容 - 这只是为了说明问题!
加速取决于文件大小和每行的操作 - 但对于小文件并且只使用JSON阅读器读取文件的最坏情况,我看到的性能与下面的设置类似。
希望对某些人有用:
def readJSON_line2(linesIn):
  #Function for reading a chunk of json lines
   '''
   Note, this function is nonsensical. A user would never use the approach suggested 
   for reading in a JSON file, 
   its role is to evaluate the MT approach for full line by line processing to both 
   increase speed and reduce memory overhead
   '''
   import json

   linesRtn = []
   for lineIn in linesIn:

       if lineIn.strip() != 0:
           lineRtn = json.loads(lineIn)
       else:
           lineRtn = ""
        
       linesRtn.append(lineRtn)

   return linesRtn




# -------------------------------------------------------------------
if __name__ == "__main__":
   import multiprocessing as mp

   path1 = "C:\\user\\Documents\\"
   file1 = "someBigJson.json"

   nBuffer = 20*nCPUs  # How many chunks are queued up (so cpus aren't waiting on processes spawning)
   nChunk = 1000 # How many lines are in each chunk
   #Both of the above will require balancing speed against memory overhead

   iJob = 0  #Tracker for SMP jobs submitted into pool
   iiJob = 0  #Tracker for SMP jobs extracted back out of pool

   jobs = []  #SMP job holder
   MTres3 = []  #Final result holder
   chunk = []  
   iBuffer = 0 # Buffer line count
   with open(path1+file1) as f:
      for line in f:
            
          #Send to the chunk
          if len(chunk) < nChunk:
              chunk.append(line)
          else:
              #Chunk full
              #Don't forget to add the current line to chunk
              chunk.append(line)
                
              #Then add the chunk to the buffer (submit to SMP pool)                  
              jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
              iJob +=1
              iBuffer +=1
              #Clear the chunk for the next batch of entries
              chunk = []
                            
          #Buffer is full, any more chunks submitted would cause undue memory overhead
          #(Partially) empty the buffer
          if iBuffer >= nBuffer:
              temp1 = jobs[iiJob].get()
              for rtnLine1 in temp1:
                  MTres3.append(rtnLine1)
              iBuffer -=1
              iiJob+=1
            
      #Submit the last chunk if it exists (as it would not have been submitted to SMP buffer)
      if chunk:
          jobs.append(pool.apply_async(readJSON_line2, args=(chunk,)))
          iJob +=1
          iBuffer +=1

      #And gather up the last of the buffer, including the final chunk
      while iiJob < iJob:
          temp1 = jobs[iiJob].get()
          for rtnLine1 in temp1:
              MTres3.append(rtnLine1)
          iiJob+=1

   #Cleanup
   del chunk, jobs, temp1
   pool.close()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接