获取文件的最后n行,类似于tail命令

211

我正在为一个Web应用程序编写日志文件查看器,因此我想要通过日志文件的行进行分页。该文件中的项目是基于行的,最新的项目位于底部。

所以我需要一个tail()方法,可以从底部读取n行并支持偏移量。这是我想出来的:

def tail(f, n, offset=0):
    """Reads a n lines from f with an offset of offset lines."""
    avg_line_length = 74
    to_read = n + offset
    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None]
        avg_line_length *= 1.3

这是一个合理的方法吗?推荐使用什么方法来带有偏移量地跟踪日志文件?


在我的系统(Linux SLES 10)上,相对于结尾的查找会引发 IOError“无法进行非零结尾相对查找”。我喜欢这个解决方案,但已经修改了它以获取文件长度(seek(0,2)然后 tell()),并使用该值相对于开头进行查找。 - Anne
3
恭喜!这个问题已经被收录到 Kippo 代码中了。 - user1019517
应该指定open命令的参数以生成f文件对象,因为根据f=open(..., 'rb')f=open(..., 'rt'),必须以不同的方式处理f - Dr Fabio Gori
我决定编写一个100%通用的解决方案,因此现在您可以像列表一样访问具有任意正面或负面切片的巨大文本文件,例如:[-2000:-1900]等等。https://github.com/SurpriseDog/readlines/blob/main/readlines.py - SurpriseDog
36个回答

132

这可能比你的方法更快。它不会假设行长度。它会反向每次一块地遍历文件,直到找到正确数量的“\n”字符。

def tail( f, lines=20 ):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = [] # blocks of size BLOCK_SIZE, in reverse order starting
                # from the end of the file
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            # read the last block we haven't yet read
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count('\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = ''.join(reversed(blocks))
    return '\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

我不喜欢对行长做出猜测的假设,因为实际上你永远无法知道那样的事情。

通常情况下,在第一次或第二次循环中,这将定位到最后20行。如果您的74个字符的内容确实准确,那么将块大小设置为2048,您几乎可以立即查找到最后20行。

此外,我不会花费太多精力来调整与物理操作系统块对齐。使用这些高级I/O包,我怀疑您是否会看到任何性能影响,尝试在操作系统块边界上对齐。如果您使用低级别的I/O,则可能会看到加速效果。


更新

对于Python 3.2及以上版本,请按字节处理文件,如文本文件(以"b"模式字符串打开的文件),只允许相对于文件开头进行搜索(例外情况是使用seek(0, 2)将指针移动到文件末尾)。:

例如:f = open('C:/.../../apache_logs.txt', 'rb')

 def tail(f, lines=20):
    total_lines_wanted = lines

    BLOCK_SIZE = 1024
    f.seek(0, 2)
    block_end_byte = f.tell()
    lines_to_go = total_lines_wanted
    block_number = -1
    blocks = []
    while lines_to_go > 0 and block_end_byte > 0:
        if (block_end_byte - BLOCK_SIZE > 0):
            f.seek(block_number*BLOCK_SIZE, 2)
            blocks.append(f.read(BLOCK_SIZE))
        else:
            f.seek(0,0)
            blocks.append(f.read(block_end_byte))
        lines_found = blocks[-1].count(b'\n')
        lines_to_go -= lines_found
        block_end_byte -= BLOCK_SIZE
        block_number -= 1
    all_read_text = b''.join(reversed(blocks))
    return b'\n'.join(all_read_text.splitlines()[-total_lines_wanted:])

15
在小日志文件上会失败-- IOError: 无效的参数-- f.seek(block*1024,2) - ohnoes
6
无法在Python 3.2中使用。我收到了io.UnsupportedOperation:can't do nonzero end-relative seeks的错误提示。我可以将偏移量更改为0,但这样就失去了函数的作用。 - Logical Fallacy
4
@DavidEnglund 这里是原因(http://www.velocityreviews.com/forums/t748976-python-3-2-bug-reading-the-last-line-of-a-file.html)。简单来说:在文本模式下,相对于文件末尾的寻址是不被允许的,据推测这是因为文件内容必须被解码,而且通常情况下,从编码字节序列的任意位置开始解码为Unicode时,寻址到一个未定义的结果。该链接中提供的建议是尝试以二进制模式打开文件并自己进行解码,同时捕获DecodeError异常。 - max
7
请勿使用这段代码,它在 Python 2.7 的一些边缘情况下会破坏行。以下@papercrane的答案可以修复这个问题。 - xApple
1
注意:您不需要“费尽心思地调整与物理操作系统块的对齐方式”。在许多操作系统上获取理想的块大小并不困难,而且Python为那些没有这个功能(例如Windows)的系统提供了合理的默认值。只需将BLOCK_SIZE的定义更改为BLOCK_SIZE = getattr(os.fstat(f.fileno()), 'st_blksize', io.DEFAULT_BUFFER_SIZE),它将使用文件的自我报告的首选I/O块大小或Python的备用默认值(目前为8 KB,如果行很短,则仍然足够小,您不会期望出现严重的减速)。 - ShadowRanger
显示剩余8条评论

101
假设您在 Python 2 上使用类 Unix 系统,可以执行以下操作:
import os
def tail(f, n, offset=0):
  stdin,stdout = os.popen2("tail -n "+n+offset+" "+f)
  stdin.close()
  lines = stdout.readlines(); stdout.close()
  return lines[:,-offset]

对于Python 3,您可以执行以下操作:

import subprocess
def tail(f, n, offset=0):
    proc = subprocess.Popen(['tail', '-n', n + offset, f], stdout=subprocess.PIPE)
    lines = proc.stdout.readlines()
    return lines[:, -offset]

9
应该是平台无关的。另外,如果您阅读问题,就会发现f是一个类似于文件的对象。 - Armin Ronacher
53
问题并未说明平台依赖性不可接受。我无法理解为什么这个回答会收到两个反对票,因为它提供了一种非常类Unix的(可能是你正在寻找的...对我来说确实如此)方法来准确地完成问题所要求的操作。 - Shabbyrobe
3
谢谢,我一开始认为必须要用纯Python解决这个问题,但实际上有Unix工具在手也无需刻意排斥使用它们,因此我选择了这种方式。值得一提的是,在现代Python中,subprocess.check_output可能比os.popen2更可取;它可以将输出作为字符串返回,并在出现非零退出码时引发异常,使事情简单了一些。 - mrooney
3
尽管这取决于平台,但这是实现所需功能的一种非常高效的方式,而且速度非常快(您不必将整个文件加载到内存中)。@Shabbyrobe - earthmeLon
7
你可以预先计算偏移量,例如:offset_total = str(n+offset),并将此行代码stdin, stdout = os.popen2("tail -n "+offset_total+" "+f)替换掉,以避免出现TypeError (cannot concatenate int+str)的错误。请注意不要改变原文意思,并让翻译内容更加通俗易懂。 - AddingColor
显示剩余9条评论

47

这是我的答案。纯Python编写。使用timeit测试,看起来速度相当快。获取一个有100,000行的日志文件的最后100行:

>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10)
0.0014600753784179688
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100)
0.00899195671081543
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=1000)
0.05842900276184082
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=10000)
0.5394978523254395
>>> timeit.timeit('tail.tail(f, 100, 4098)', 'import tail; f = open("log.txt", "r");', number=100000)
5.377126932144165

这是代码:

import os


def tail(f, lines=1, _buffer=4098):
    """Tail a file and get X lines from the end"""
    # place holder for the lines found
    lines_found = []

    # block counter will be multiplied by buffer
    # to get the block size from the end
    block_counter = -1

    # loop until we find X lines
    while len(lines_found) < lines:
        try:
            f.seek(block_counter * _buffer, os.SEEK_END)
        except IOError:  # either file is too small, or too many lines requested
            f.seek(0)
            lines_found = f.readlines()
            break

        lines_found = f.readlines()

        # we found enough lines, get out
        # Removed this line because it was redundant the while will catch
        # it, I left it for history
        # if len(lines_found) > lines:
        #    break

        # decrement the block counter to get the
        # next X bytes
        block_counter -= 1

    return lines_found[-lines:]

3
优雅的解决方案!if len(lines_found) > lines: 这句话真的有必要吗? loop 条件不会也能捕捉到它吗? - Maximilian Peters
我想了解一个问题:os.SEEK_END是否只是为了清晰而使用?据我所知,它的值是固定的(= 2)。我想知道是否可以将其省略以便能够省略import os。感谢您提供的绝妙解决方案! - n1k31t4
2
@MaximilianPeters 是的,这不是必要的。我已经将其注释掉了。 - glenbot
@DexterMorgan 你可以用整数等价物替换 os.SEEK_END。它主要是为了可读性而存在的。 - glenbot
1
我点了赞,但有一个小问题。在查找后,第一行读取可能不完整,所以为了获取N个完整的行,我在我的副本中将while len(lines_found) < lines更改为while len(lines_found) <= lines。谢谢! - Graham Klyne
1
总是从结尾开始查找是一个错误,因为它假设每个循环迭代的结尾都是相同的。想象一下在代码运行时写入的日志文件。 - BlackJack

36

如果可以读取整个文件,则使用deque。

from collections import deque
deque(f, maxlen=n)

在2.6版本之前,双向队列没有maxlen选项,但很容易实现。

import itertools
def maxque(items, size):
    items = iter(items)
    q = deque(itertools.islice(items, size))
    for item in items:
        del q[0]
        q.append(item)
    return q

如果需要从文件末尾读取内容,则可以使用加速(即指数级)搜索算法。

def tail(f, n):
    assert n >= 0
    pos, lines = n+1, []
    while len(lines) <= n:
        try:
            f.seek(-pos, 2)
        except IOError:
            f.seek(0)
            break
        finally:
            lines = list(f)
        pos *= 2
    return lines[-n:]

为什么最后那个函数能够工作? pos *= 2 看起来完全是随意的操作。它有什么重要性吗? - 2mac
3
@2mac 指数搜索。它从文件末尾开始迭代读取,每次加倍读取的行数,直到找到足够的行数为止。 - A. Coady
我认为从结尾读取的解决方案将不支持使用UTF-8编码的文件,因为字符长度是可变的,你可能会(很可能会)落在一些无法正确解释的奇怪偏移量上。 - Mike
1
不幸的是,您的“galloping”搜索解决方案不适用于Python 3,因为f.seek()不接受负偏移量。我已经更新了您的代码,使其适用于Python 3 链接 - itsjwala
Python 3 中获取文件最后一行的一行代码:with open('example', 'r') as f:d = deque(f, maxlen=1) - Timo
显示剩余2条评论

28

以上S.Lott的答案几乎可以解决我的问题,但最终导致我只得到了部分行。原来是因为读取的数据以相反的顺序保存在data中,因此在块边界上会破坏数据。当调用''.join(data)时,块的顺序是错误的。下面的代码修复了这个问题。

def tail(f, window=20):
    """
    Returns the last `window` lines of file `f` as a list.
    f - a byte file-like object
    """
    if window == 0:
        return []
    BUFSIZ = 1024
    f.seek(0, 2)
    bytes = f.tell()
    size = window + 1
    block = -1
    data = []
    while size > 0 and bytes > 0:
        if bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            data.insert(0, f.read(BUFSIZ))
        else:
            # file too small, start from begining
            f.seek(0,0)
            # only read what was not read
            data.insert(0, f.read(bytes))
        linesFound = data[0].count('\n')
        size -= linesFound
        bytes -= BUFSIZ
        block -= 1
    return ''.join(data).splitlines()[-window:]

1
在列表的开头插入元素是一个不好的想法。为什么不使用双端队列结构呢? - Sergey11g
1
很遗憾不兼容Python 3...正在努力找出原因。 - Sherlock70

24

我最终使用的代码。我认为这是目前为止最好的:

def tail(f, n, offset=None):
    """Reads a n lines from f with an offset of offset lines.  The return
    value is a tuple in the form ``(lines, has_more)`` where `has_more` is
    an indicator that is `True` if there are more lines in the file.
    """
    avg_line_length = 74
    to_read = n + (offset or 0)

    while 1:
        try:
            f.seek(-(avg_line_length * to_read), 2)
        except IOError:
            # woops.  apparently file is smaller than what we want
            # to step back, go to the beginning instead
            f.seek(0)
        pos = f.tell()
        lines = f.read().splitlines()
        if len(lines) >= to_read or pos == 0:
            return lines[-to_read:offset and -offset or None], \
                   len(lines) > to_read or pos > 0
        avg_line_length *= 1.3

7
答案并没有完全回答这个问题。 - sheki
此解决方案会引发异常,您不能寻求浮点数,而在此处通过乘以1.3来执行。同时,您一遍又一遍地重新读取整个文件。 - Miro Krsjak

14

使用mmap的简单快速解决方案:

import mmap
import os

def tail(filename, n):
    """Returns last n lines from the filename. No exception handling"""
    size = os.path.getsize(filename)
    with open(filename, "rb") as f:
        # for Windows the mmap parameters are different
        fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
        try:
            for i in xrange(size - 1, -1, -1):
                if fm[i] == '\n':
                    n -= 1
                    if n == -1:
                        break
            return fm[i + 1 if i else 0:].splitlines()
        finally:
            fm.close()

1
这可能是当输入可能很大时最快的答案(如果它使用.rfind方法向后扫描换行符,而不是在Python级别执行逐字节检查,则会更快;在CPython中,用C内置调用替换Python级别代码通常会获得很多优势)。对于较小的输入,具有maxlendeque更简单且可能同样快。 - ShadowRanger

6

将@papercrane的解决方案更新至python3。 使用open(filename, 'rb')打开文件并执行以下操作:

def tail(f, window=20):
    """Returns the last `window` lines of file `f` as a list.
    """
    if window == 0:
        return []

    BUFSIZ = 1024
    f.seek(0, 2)
    remaining_bytes = f.tell()
    size = window + 1
    block = -1
    data = []

    while size > 0 and remaining_bytes > 0:
        if remaining_bytes - BUFSIZ > 0:
            # Seek back one whole BUFSIZ
            f.seek(block * BUFSIZ, 2)
            # read BUFFER
            bunch = f.read(BUFSIZ)
        else:
            # file too small, start from beginning
            f.seek(0, 0)
            # only read what was not read
            bunch = f.read(remaining_bytes)

        bunch = bunch.decode('utf-8')
        data.insert(0, bunch)
        size -= bunch.count('\n')
        remaining_bytes -= BUFSIZ
        block -= 1

    return ''.join(data).splitlines()[-window:]

你可能想要添加:assert "b" in file.mode, "文件模式必须是字节!" 来检查文件模式是否实际上是字节。 - JulianWgs

6

最简单的方法是使用deque

from collections import deque

def tail(filename, n=10):
    with open(filename) as f:
        return deque(f, n)

4
如果你处理的是大文件,请记住这会遍历整个文件。 - Austin

5

应评论者的要求,在类似问题的答案中,使用了相同的技巧来改变文件的最后一行而不是获取它。对于大型文件,mmap 是最佳的方式。为了改进现有的 mmap 答案,这个版本在 Windows 和 Linux 之间具有可移植性,并且应该运行得更快(尽管在GB范围内的文件中需要一些修改才能在32位 Python 上工作,请参阅其他答案以获取处理此问题的提示,以及修改以在Python 2上工作)。

import io  # Gets consistent version of open for both Py2.7 and Py3.x
import itertools
import mmap

def skip_back_lines(mm, numlines, startidx):
    '''Factored out to simplify handling of n and offset'''
    for _ in itertools.repeat(None, numlines):
        startidx = mm.rfind(b'\n', 0, startidx)
        if startidx < 0:
            break
    return startidx

def tail(f, n, offset=0):
    # Reopen file in binary mode
    with io.open(f.name, 'rb') as binf, mmap.mmap(binf.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # len(mm) - 1 handles files ending w/newline by getting the prior line
        startofline = skip_back_lines(mm, offset, len(mm) - 1)
        if startofline < 0:
            return []  # Offset lines consumed whole file, nothing to return
            # If using a generator function (yield-ing, see below),
            # this should be a plain return, no empty list

        endoflines = startofline + 1  # Slice end to omit offset lines

        # Find start of lines to capture (add 1 to move from newline to beginning of following line)
        startofline = skip_back_lines(mm, n, startofline) + 1

        # Passing True to splitlines makes it return the list of lines without
        # removing the trailing newline (if any), so list mimics f.readlines()
        return mm[startofline:endoflines].splitlines(True)
        # If Windows style \r\n newlines need to be normalized to \n, and input
        # is ASCII compatible, can normalize newlines with:
        # return mm[startofline:endoflines].replace(os.linesep.encode('ascii'), b'\n').splitlines(True)

假设你需要跟踪的行数很少,足以一次性将它们全部安全地读入内存;你也可以将其作为生成器函数,并通过替换最后一行来手动逐行读取:

        mm.seek(startofline)
        # Call mm.readline n times, or until EOF, whichever comes first
        # Python 3.2 and earlier:
        for line in itertools.islice(iter(mm.readline, b''), n):
            yield line

        # 3.3+:
        yield from itertools.islice(iter(mm.readline, b''), n)

最后,以二进制模式读取(必须使用mmap),因此它会产生str行(Py2)和bytes行(Py3); 如果您想要unicode(Py2)或str(Py3),则可以调整迭代方法来解码并/或修复换行符:
        lines = itertools.islice(iter(mm.readline, b''), n)
        if f.encoding:  # Decode if the passed file was opened with a specific encoding
            lines = (line.decode(f.encoding) for line in lines)
        if 'b' not in f.mode:  # Fix line breaks if passed file opened in text mode
            lines = (line.replace(os.linesep, '\n') for line in lines)
        # Python 3.2 and earlier:
        for line in lines:
            yield line
        # 3.3+:
        yield from lines

注意:我在没有Python测试的机器上输入了所有内容。如果我打错了什么,请告诉我;这与我的另一个答案足够相似,我认为它应该可以工作,但微调(例如处理offset)可能会导致细微的错误。如果有任何错误,请在评论中告诉我。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接