Python如何一次读取N行数据?

61

我正在编写一段代码,以每次N行的方式读取一个庞大的文本文件(几GB),处理该批次,并继续进行下一批N行,直到完成整个文件。(最后一批未必是完美大小无所谓)。

我已经了解了使用itertools islice进行此操作的相关内容。我认为我已经完成了其中一半:

from itertools import islice
N = 16
infile = open("my_very_large_text_file", "r")
lines_gen = islice(infile, N)

for lines in lines_gen:
     ...process my lines...

问题在于我想处理接下来的16行,但我缺少某些东西。


可能是Python中读取大文件的惰性方法?的重复问题。 - Ken White
2
@ken - OP正在询问如何使用islice完成这个任务,在该帖子中,OP询问如何使用yield完成此任务。 - Kev
可能是 如何在Python中一次读取N行文件? 的重复问题。 - Jonathan H
@JonathanH 我认为这是更好的问题版本,主要是基于顶部答案的力量。那里的顶部/被接受的答案只获取前N行,并包括一种将整个文件先读入内存的变体(显然不可取)。 - Karl Knechtel
7个回答

81

islice()可以用来获取迭代器的下一个n个元素。因此,list(islice(f, n))将返回文件f的下一个n行的列表。在循环中使用这个函数,可以将文件按照n行为一块进行处理。当到达文件结尾时,列表可能会变得更短,最终该调用会返回一个空列表。

from itertools import islice
with open(...) as f:
    while True:
        next_n_lines = list(islice(f, n))
        if not next_n_lines:
            break
        # process next_n_lines

另一种选择是使用分组模式

from itertools import zip_longest
with open(...) as f:
    for next_n_lines in zip_longest(*[f] * n):
        # process next_n_lines

1
我最近在学习Python,有一个问题。理想情况下,如果您正在读取数据库或记录文件,您需要将记录标记为已读(需要另一列),并且在下一批中,您将开始处理下一个未标记的记录。这里是如何实现的?特别是这里 next_n_lines = list(islice(infile, n)) - zengr
2
@zengr:你必须把计数器存储在某个地方。那是一个完全无关的问题——请使用右上角的“提问”按钮。 - Sven Marnach
我想按n行读取一个文件。izip_longest()正是我要找的!但是,我真的不理解*[f] * n这些参数……有什么想法吗? - Stéphane
@Stéphane:这个习语在itertools.izip()的文档中提到。看一下那里等效的Python代码,并记住例如izip(*[f] * 5)等同于izip(f, f, f, f, f) - Sven Marnach
1
@dhfromkorea:我建议使用自定义生成器函数,参见 https://gist.github.com/smarnach/75146be0088e7b5c503f。 - Sven Marnach
显示剩余5条评论

11
该问题似乎假定每次读取一个"N行"的块可以提高读取"巨大文本文件"的效率。这会在已经高度优化的库上添加应用程序层缓冲,增加了复杂性,并且可能完全没有任何好处。
with open('my_very_large_text_file') as f:
    for line in f:
        process(line)

相较于任何其他方案,该方案在时间、空间、复杂度和可读性方面都可能更加优越。

另请参见罗布·派克的前两条规则杰克逊的两条规则Python之禅PEP-20。如果您只是想用islice玩一下,那就不需要涉及大文件的内容。


2
你好!我需要将我的巨大文本文件分块处理,每个块有N行,是因为我要从每组N行中选择一行随机行。这是用于生物信息学分析的,我想创建一个具有整个数据集等量代表性的较小文件。在生物学中,并非所有数据都是平等创建的!也许有一种不同(或更好?)的方法可以从庞大的数据集中选择X个随机行,使其等间距分布,但这是我想到的第一件事。感谢提供的链接! - brokentypewriter
@brokentypewriter,这是一个完全不同的问题,有更多统计学上有用的样本。我会寻找现成的东西,并在这里提出一个新问题。当我做到时,我会在这里放一个链接。自相关是一个悲伤的产物。 - msw
我在这个问题的回答中回答了它:https://dev59.com/92w15IYBdhLWcg3w4_zk#6347142 - msw
@msw 如果我需要读取一个非常大的文件的10行并将它们发送到multiprocessing.Pool,那该怎么办?逐行读取显然是不可行的,对吧? - pippo1980

3

这里是另一种使用groupby的方法:

from itertools import count, groupby

N = 16
with open('test') as f:
    for g, group in groupby(f, key=lambda _, c=count(): c.next()/N):
        print list(group)

工作原理:

基本上,groupby() 函数将根据 key 参数的返回值对行进行分组,而 key 参数是 lambda 函数 lambda _, c=count(): c.next()/N,并利用当定义 function 时,c 参数将绑定到 count(),因此每次 groupby() 将调用 lambda 函数并计算返回值以确定分组器来分组行,所以:

# 1 iteration.
c.next() => 0
0 / 16 => 0
# 2 iteration.
c.next() => 1
1 / 16 => 0
...
# Start of the second grouper.
c.next() => 16
16/16 => 1   
...

2

因为要求从文件中选择的行具有统计上的均匀分布,所以我提供了这种简单的方法。

"""randsamp - extract a random subset of n lines from a large file"""

import random

def scan_linepos(path):
    """return a list of seek offsets of the beginning of each line"""
    linepos = []
    offset = 0
    with open(path) as inf:     
        # WARNING: CPython 2.7 file.tell() is not accurate on file.next()
        for line in inf:
            linepos.append(offset)
            offset += len(line)
    return linepos

def sample_lines(path, linepos, nsamp):
    """return nsamp lines from path where line offsets are in linepos"""
    offsets = random.sample(linepos, nsamp)
    offsets.sort()  # this may make file reads more efficient

    lines = []
    with open(path) as inf:
        for offset in offsets:
            inf.seek(offset)
            lines.append(inf.readline())
    return lines

dataset = 'big_data.txt'
nsamp = 5
linepos = scan_linepos(dataset) # the scan only need be done once

lines = sample_lines(dataset, linepos, nsamp)
print 'selecting %d lines from a file of %d' % (nsamp, len(linepos))
print ''.join(lines)

我在一个包含300万行、1.7GB的模拟数据文件上进行了测试。在我的不太好的桌面上,scan_linepos 占用了大约20秒的运行时间。

为了检查 sample_lines 的性能,我使用了 timeit 模块,如下所示:

import timeit
t = timeit.Timer('sample_lines(dataset, linepos, nsamp)', 
        'from __main__ import sample_lines, dataset, linepos, nsamp')
trials = 10 ** 4
elapsed = t.timeit(number=trials)
print u'%dk trials in %.2f seconds, %.2fµs per trial' % (trials/1000,
        elapsed, (elapsed/trials) * (10 ** 6))

对于不同的nsamp值;当nsamp为100时,单个sample_lines完成时间为460微秒,并且随着样本量的线性增加,每次调用需要47毫秒。
接下来自然而然的问题是“Python的随机数生成器真的随机吗?”,答案是“亚加密级别的,但对于生物信息学来说肯定足够了”。

@brokentypewriter - 感谢你让我从真正的工作中得到愉快的转移 o.O - msw
@msw 很棒的解决方案。它运行非常快,我喜欢random.sample无需替换地进行抽样。唯一的问题是在写输出文件时出现了内存错误...但我可能可以自己修复它。(我将尝试的第一件事是逐行写入输出文件,而不是将所有行连接在一起)。感谢您提供如此好的解决方案!我有900万行,在循环中对它们进行11次采样,因此节省时间的措施非常重要!操作列表并将所有行加载到列表中需要运行太长时间。 - brokentypewriter
@msw 我已经修复了它,每次将每行写入输出文件以避免内存问题。一切都运行得很好!它只需要4分25秒就可以运行,这比之前的版本(迭代列表)要快得多,那需要2个小时以上才能运行。我真的很喜欢这个解决方案,因为它只会将从其偏移量采样的行加载到内存中。这是一个巧妙而高效的技巧。我可以说今天我学到了新东西! - brokentypewriter
@brokentypewriter - 很高兴能够提供帮助,但是这种方法的功劳归功于Kernighan和Plaugher在《Pascal软件工具》(1981)中使用了这种索引方法来实现ed(1),而该语言没有本地字符类型!有些技巧永远不会过时。 - msw
@brokentypewriter,msw:scan_linepos()在列表中不包括偏移量为0的位置,但是会包括超过最后一行的偏移量。这意味着示例永远不会包含第一行,但如果触及超过最后一行的偏移量时可能包含一个空行。最简单的修复方法是交换for循环中的两行代码位置。 - Sven Marnach

1

使用了来自如何以最“Pythonic”的方式迭代列表中的块?的分块函数:

from itertools import izip_longest

def grouper(iterable, n, fillvalue=None):
    "grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
    args = [iter(iterable)] * n
    return izip_longest(*args, fillvalue=fillvalue)


with open(filename) as f:
    for lines in grouper(f, chunk_size, ""): #for every chunk_sized chunk
        """process lines like 
        lines[0], lines[1] , ... , lines[chunk_size-1]"""

@Sven Marnach; 抱歉,“grouper” 应该是 “chunker”。但我认为(我不太理解你的意思),它与你的 grouper 函数做的事情相同。编辑:不,它并不相同。 - utdemir
仍然令人困惑。1. chunker() 定义了两个参数,但被调用时传入了三个参数。2. 将 f 作为 seq 传递将尝试对文件对象进行切片,这根本行不通。你只能对序列进行切片。 - Sven Marnach
@Sven Marnach;实际上,我首先采用了那个问题的第一个答案作为我的答案,为此编写了代码,并认为第二个答案更好,于是改变了函数,但我忘记了更改函数调用。你关于切片的观点是正确的,这是我的错误,我正在努力纠正它。谢谢。 - utdemir
@utdemir izip_longest ---> zip_longest - pippo1980

0

Assuming "batch" means to want to process all 16 recs at one time instead of individually, read the file one record at a time and update a counter; when the counter hits 16, process that group.

interim_list = []
infile = open("my_very_large_text_file", "r")
ctr = 0
for rec in infile:
    interim_list.append(rec)
    ctr += 1
    if ctr > 15:
        process_list(interim_list)
        interim_list = []
        ctr = 0

最终组

process_list(interim_list)


0

另一个解决方案可能是创建一个迭代器,该迭代器产生n个元素的列表:

def n_elements(n, it):
    try:
        while True:
            yield [next(it) for j in range(0, n)]
    except StopIteration:
        return

with open(filename, 'rt') as f:
    for n_lines in n_elements(n, f):
        do_stuff(n_lines)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接