如何高效获取文本文件的第一行和最后一行?

79

我有一个文本文件,每行都包含一个时间戳。我的目标是找到时间范围。所有时间都按顺序排列,因此第一行将是最早的时间,而最后一行将是最新的时间。我只需要第一行和最后一行。在Python中获取这些行的最有效方法是什么?

注意:这些文件相对较长,每个文件约有1-2百万行,我需要对几百个文件进行此操作。

13个回答

95
from os import SEEK_END, SEEK_CUR

def readlast(f):
    try:
        f.seek(-2, SEEK_END)       # Jump to the second last byte.
        while f.read(1) != b"\n":  #  Until newline is found ...
            f.seek(-2, SEEK_CUR)   #  ... jump back, over the read byte plus one.
    except OSError:                # Reached begginning of File
        f.seek(0)                  #  Set cursor to beginning of file as well.
    return f.read()                # Read all data from this point on.
        
with open(path, "rb") as f:
    first = f.readline()
    last  = readlast(f)

使用seek时,格式为fseek(offset, whence=0)
引用自docs.python.org
将流位置更改为给定的字节偏移量。offset相对于whence指示的位置进行解释。whence的默认值是SEEK_SET。whence的取值有:
- SEEK_SET或0 = 流的起始位置(默认值);offset应为零或正数 - SEEK_CUR或1 = 当前流位置;offset可以是负数 - SEEK_END或2 = 流的末尾;offset通常为负数
疾驰搜索(2.7+)
from collections import deque
from os import SEEK_CUR, SEEK_END

def readlast(f, d = b'\n'):
    """"readlast(f: io.IOBase, d: bytes = b'\n') -> bytes

    Return the last segment of file `f`, containing data segments separated by
    `d`.
    """
    arr = deque(); step = 1; pos = -1
    try:
        # Seek to last byte of file, save it to arr as to not check for newline.
        pos = f.seek(-1, SEEK_END) 
        arr.appendleft(f.read())
        # Seek past the byte read, plus one to use as the first segment.
        pos = f.seek(-2, SEEK_END) 
        seg = f.read(1)
        # Break when 'd' occurs, store index of the rightmost match in 'i'.
        while seg.rfind(d) == -1:
            # Store segments with no b'\n' in a memory-efficient 'deque'.
            arr.appendleft(seg)
            # Step back in file, past the bytes just read plus twice that.
            pos = f.seek(-step*3, SEEK_CUR)
            # Read new segment, twice as big as the one read previous iteration.
            step *= 2
            seg = f.read(step)
        # Ignore the characters up to 'i', and the triggering newline character.
        arr.appendleft(seg[seg.rfind(d)+1:])
    except OSError: 
        # Reached beginning of file. Read remaining data and check for newline.
        f.seek(0)
        seg = f.read(pos)
        arr.appendleft(seg[seg.rfind(d)+1:])
    return b"".join(arr)

我可能会选择一个使用指数增长步长的函数,因此今天添加了这样一个例子,并将其与原始答案放在一起(暂时)。
它很好地处理了边缘情况,除了多字节分隔符和以文本模式打开的文件(有关处理这些情况的示例,请参见“边缘情况”)。
用法:
f.write(b'X\nY\nZ\n'); f.seek(0)
assert readlast(f) == b'Z\n'

f.write(b'\n\n'; f.seek(0)
assert readlast(f) == b'\n'

边缘情况(2.7+)

我避免修改原始答案,因为问题明确要求效率,并且要尊重之前的赞同。

这个版本解决了多年来提出的所有评论和问题,同时保留了逻辑和向后兼容性(以可读性为代价)。

目前已经解决的问题有:

  • 当解析空文件时返回空字符串,由Loïc在评论中指出。
  • 当找不到分隔符时返回所有内容,由LazyLeopard在评论中提出。
  • 避免使用相对偏移量以支持文本模式,由AnotherParker在评论中提出。
  • UTF16/UTF32的技巧,由Pietro Battiston在评论中指出。
还支持多字节分隔符。
from os import SEEK_CUR, SEEK_END

def _readlast__bytes(f, sep, size, step):
    # Point cursor 'size' + 'step' bytes away from the end of the file.
    o = f.seek(0 - size - step, SEEK_END)
    # Step 'step' bytes each iteration, halt when 'sep' occurs.
    while f.read(size) != sep:
        f.seek(0 - size - step, SEEK_CUR)

def _readlast__text(f, sep, size, step):
    # Text mode, same principle but without the use of relative offsets.
    o = f.seek(0, SEEK_END)
    o = f.seek(o - size - step)
    while f.read(size) != sep:
        o = f.seek(o - step)

def readlast(f, sep, fixed = False):
    """readlast(f: io.BaseIO, sep: bytes|str, fixed: bool = False) -> bytes|str

    Return the last segment of file `f`, containing data segments separated by
    `sep`.

    Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
    to pass the correct delimiter) in files opened in byte mode.
    """
    size = len(sep)
    step = len(sep) if (fixed is True) else (fixed or 1)
    step = size if fixed else 1
    if not size:
        raise ValueError("Zero-length separator.")
    try:
        if 'b' in f.mode:
            # Process file opened in byte mode.
            _readlast__bytes(f, sep, size, step)
        else:
            # Process file opened in text mode.
            _readlast__text(f, sep, size, step)
    except (OSError, ValueError): 
        # Beginning of file reached.
        f.seek(0, SEEK_SET)
    return f.read()

使用方法:

f.write("X\nY\nZ\n".encode('utf32'); f.seek(0)
assert readlast(f, "\n".encode('utf32')[4:]) == "Z\n"

f.write(b'X<br>Y</br>'; f.seek(0)
assert readlast(f, b'<br>', fixed=False) == "Y</br>"

效率

用于与此答案进行比较的代码(发布时最受赞同的答案的优化版本):

with open(file, "rb") as f:
    first = f.readline()     # Read and store the first line.
    for last in f: pass      # Read all lines, keep final value.

结果:

10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs  6.92s
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95s

“每个文件有1-2百万行”,正如问题所述,这当然会增加差异很多。

4
这是最简明的解决方案,我很喜欢它。不猜测块大小的好处在于它能很好地处理小型测试文件。我添加了几行代码并将其封装成了一个我称之为“tail_n”的函数。 - MarkHu
1
我很喜欢这个想法,但是无法让它正常工作。File "mapper1.2.2.py", line 17, in get_last_line f.seek(-2, 2) IOError: [Errno 22] Invalid argument - Loïc
2
算了,文件是空的,呆子。最佳答案还是你的。+1 - Loïc
2
根据此评论的回答,这个while f.read(1) != "\n":应该改为while f.read(1) != b"\n": - Artjom B.
5
记录一下:如果出现io.UnsupportedOperation: can't do nonzero end-relative seeks的异常,需要分两步进行操作:首先获取文件长度,然后加上偏移量,最后将其传递给f.seek(size+offset,os.SEEK_SET) - AnotherParker
显示剩余8条评论

67

io模块文档

with open(fname, 'rb') as fh:
    first = next(fh).decode()

    fh.seek(-1024, 2)
    last = fh.readlines()[-1].decode()

这里的变量值为1024:它代表平均字符串长度。我只是拿1024作为例子。如果您有平均行长度的估计值,可以使用该值乘以2。

由于您完全不知道行长度的可能上限,显而易见的解决方案是循环读取文件:

for line in fh:
    pass
last = line

无需烦恼二进制标志,只需使用open(fname)

预计时间:由于你需要处理许多文件,因此可以使用random.sample创建几十个文件样本,并在这些文件上运行此代码以确定最后一行的长度。使用先验的大值为位置偏移(比如1 MB),这将帮助您估算整个运行的值。


不能保证行不超过1024个字符,可能会有一些时间戳以外的垃圾信息。 - pasbino
@pasbino:你有一些上限吗? - SilentGhost
@pasbino:你仍然可以在循环中使用类似的方法,直到找到完整的一行。 - FogleBird
21
使用fh.seek(-1024, os.SEEK_END)代替fh.seek(-1024, 2)可以增加可读性。 - marsl
3
以下陈述并不正确:您无需担心二进制标志,只需使用open(fname)即可。 使用b标志进行打开是至关重要的。如果您使用open(fname)而不是open(fname,'rb'),则会出现 io.UnsupportedOperation: can't do nonzero end-relative seeks的错误。 - patryk.beza
显示剩余9条评论

25

这是 SilentGhost 回答的修改版,可以实现你想要的效果。

with open(fname, 'rb') as fh:
    first = next(fh)
    offs = -100
    while True:
        fh.seek(offs, 2)
        lines = fh.readlines()
        if len(lines)>1:
            last = lines[-1]
            break
        offs *= 2
    print first
    print last

这里不需要设置行长度的上限。


10

你会使用Unix命令吗?我认为使用head -1tail -n 1是最有效的方法。另外,你可以使用简单的fid.readline()来获取第一行和fid.readlines()[-1]来获取最后一行,但这可能会消耗太多内存。


创建一个子进程来执行这些命令会是最有效的方式吗? - pasbino
10
如果你有Unix系统,那么 os.popen("tail -n 1 %s" % filename).read() 可以很好地获取最后一行。 - Michael Dunn
1
对于大文件来说,fid.readlines()[-1] 不是一个好的解决方案。+1 代表头部,-1 代表尾部。 - Joao Figueiredo
os.popen("tail -n 1 %s" % filename).read() --> 自2.6版本起已被弃用 - LarsVegas

6

这是我的解决方案,同时兼容Python3。它也管理边界情况,但它缺少utf-16支持:

def tail(filepath):
    """
    @author Marco Sulla (marcosullaroma@gmail.com)
    @date May 31, 2016
    """

    try:
        filepath.is_file
        fp = str(filepath)
    except AttributeError:
        fp = filepath

    with open(fp, "rb") as f:
        size = os.stat(fp).st_size
        start_pos = 0 if size - 1 < 0 else size - 1

        if start_pos != 0:
            f.seek(start_pos)
            char = f.read(1)

            if char == b"\n":
                start_pos -= 1
                f.seek(start_pos)

            if start_pos == 0:
                f.seek(start_pos)
            else:
                char = ""

                for pos in range(start_pos, -1, -1):
                    f.seek(pos)

                    char = f.read(1)

                    if char == b"\n":
                        break

        return f.readline()

这篇文章的灵感来源于 Trasp的回答AnotherParker的评论


4

首先以读模式打开文件。然后使用readlines()方法逐行读取所有行并将其存储在一个列表中。现在,您可以使用列表切片来获取文件的第一行和最后一行。

    a=open('file.txt','rb')
    lines = a.readlines()
    if lines:
        first_line = lines[:1]
        last_line = lines[-1]

1
我正好在寻找这个,我不需要第一行和最后一行,所以lines[1,-2]可以得到标题和页脚之间的文本。 - guneysus
4
此选项无法处理空文件。 - Avid Coder
10
非常大的文件会导致崩溃。 - akarapatis

4
w=open(file.txt, 'r')
print ('first line is : ',w.readline())
for line in w:  
    x= line
print ('last line is : ',x)
w.close()
< p > for 循环运行通过这些行,x 在最后一次迭代中获得了最后一行。


这应该是被接受的答案。我不知道为什么其他答案中会有那么多与低级IO相关的混乱? - GreenAsJade
3
我的理解是,“messing around” 的作用是避免从头到尾读取整个文件。对于大型文件来说,这可能效率低下。 - bli

3
with open("myfile.txt") as f:
    lines = f.readlines()
    first_row = lines[0]
    print first_row
    last_row = lines[-1]
    print last_row

你能解释一下为什么你的解决方案会更好吗? - Zulu
嗨,我发现自己有同样的需求,即在文本文件的最后一行处删除最后一个逗号,并以这种方式解决了它的定位问题;然后我想分享一下。这个解决方案很简单、实用和立竿见影,但我不知道它是否是效率最高的。你对此有什么看法? - Riccardo Volpe
嗯,它必须读取并处理整个文件,因此看起来是最不高效的方式。 - rakslice
好的...那么,如果您不知道字符串长度,哪种方法会是最好的?我需要尝试另一种方法(https://dev59.com/tnA75IYBdhLWcg3wVniI#3346492)。谢谢! - Riccardo Volpe
1
使用 f.readlines()[-1] 替代新变量。 0 = 第一行, 1 = 第二行, -1 = 最后一行, -2 = 倒数第二行... - BladeMight

2
没有人提到使用reversed:
f=open(file,"r")
r=reversed(f.readlines())
last_line_of_file = r.next()

5
.readlines()会一次性将文件中的所有行读入内存,但它不是解决这个问题的方案。 - Steve Mayne

2

这是@Trasp答案的一个扩展,它具有处理仅有一行的文件的边缘情况的额外逻辑。如果您要反复读取连续更新的文件的最后一行,则处理此情况可能很有用。如果没有这个逻辑,如果您尝试抓取刚创建且仅有一行的文件的最后一行,将引发IOError: [Errno 22] Invalid argument

def tail(filepath):
    with open(filepath, "rb") as f:
        first = f.readline()      # Read the first line.
        f.seek(-2, 2)             # Jump to the second last byte.
        while f.read(1) != b"\n": # Until EOL is found...
            try:
                f.seek(-2, 1)     # ...jump back the read byte plus one more.
            except IOError:
                f.seek(-1, 1)
                if f.tell() == 0:
                    break
        last = f.readline()       # Read last line.
    return last

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接