理想情况下,我想要像
tail.getNewData()
这样的东西,每次我需要更多数据时都可以调用它。如果您使用的是Linux操作系统(因为Windows不支持在文件上调用select),您可以使用subprocess模块和select模块。
import time
import subprocess
import select
f = subprocess.Popen(['tail','-F',filename],\
stdout=subprocess.PIPE,stderr=subprocess.PIPE)
p = select.poll()
p.register(f.stdout)
while True:
if p.poll(1):
print f.stdout.readline()
time.sleep(1)
当有新数据可用时,此代码会轮询输出管道并将其打印出来。通常情况下,time.sleep(1)
和 print f.stdout.readline()
将被替换为实用的代码。
您可以使用subprocess模块而不需要额外的select模块调用。
import subprocess
f = subprocess.Popen(['tail','-F',filename],\
stdout=subprocess.PIPE,stderr=subprocess.PIPE)
while True:
line = f.stdout.readline()
print line
这也会在新行被添加时打印出来,但它会阻塞直到尾部程序关闭,可能是通过f.kill()
。
print line
,而是使用sys.stdout.write(line)
来处理print
会插入的额外换行符。 - Mayank Jaiswal.strip()
也会删除可能很重要的前导空格。 - Matt.readline()
保留了换行符,而print
又添加了一个新的换行符,那么只需要使用sys.stdout.write()
代替print
就可以轻松解决问题。 - Matt使用sh模块(pip install sh):
from sh import tail
# runs forever
for line in tail("-f", "/var/log/some_log_file.log", _iter=True):
print(line)
[更新]
由于使用 _iter
=True 的 sh.tail 是一个生成器,因此您可以:
import sh
tail = sh.tail("-f", "/var/log/some_log_file.log", _iter=True)
然后你可以使用以下代码获取新的数据:
new_data = tail.next()
注意,如果尾缓冲区为空,则它将阻塞直到有更多的数据(从您的问题中不清楚您在这种情况下想要做什么)。def tail_F(some_file):
while True:
try:
for line in sh.tail("-f", some_file, _iter=True):
yield line
except sh.ErrorReturnCode_1:
yield None
如果文件变得不可访问,生成器将返回None。然而,如果文件仍然可访问,它仍会阻塞直到有新数据。在这种情况下,我仍然不清楚您想要做什么。def tail_F(some_file):
first_call = True
while True:
try:
with open(some_file) as input:
if first_call:
input.seek(0, 2)
first_call = False
latest_data = input.read()
while True:
if '\n' not in latest_data:
latest_data += input.read()
if '\n' not in latest_data:
yield ''
if not os.path.isfile(some_file):
break
continue
latest_lines = latest_data.split('\n')
if latest_data[-1] != '\n':
latest_data = latest_lines[-1]
else:
latest_data = input.read()
for line in latest_lines[:-1]:
yield line + '\n'
except IOError:
yield ''
如果文件变得不可访问或者没有新的数据,这个生成器会返回''。
[更新]
似乎最后一个答案在运行到数据末尾时会回到文件开头。- Eli
我认为第二种方法将在tail处理结束时输出最后的十行数据,而使用-f
时,每当出现I/O错误时就会结束。在类Unix环境中,tail --follow --retry
的行为与此并没有太大差异。
也许如果您更新问题,并解释一下您的真实目标(即您想模仿tail--retry的原因),您将获得更好的答案。
最后一个答案实际上没有跟随tail,只是在运行时读取了可用的内容。- Eli
当然,默认情况下,tail会显示最后10行...您可以使用file.seek将文件指针定位到文件末尾,我会留下一个适当的实现作为读者的练习。
在我看来,使用file.read()的方法比基于子进程的解决方案更加优雅。
file.read()
方法有什么更优雅的地方? tail
可以正确处理显示文件的最后10行(即使这些行很大),可以永远读取新行,可以在平台相关的方式下在新行到达时唤醒,还可以在需要时打开新文件。一言以蔽之,该实用程序非常适合其设计目的 - 重新实现它似乎不太优雅。(但我必须承认,sh
模块非常棒。) - nneonneoimport time
from typing import Iterator
def follow(file, sleep_sec=0.1) -> Iterator[str]:
""" Yield each line from a file as they are written.
`sleep_sec` is the time to sleep after empty reads. """
line = ''
while True:
tmp = file.readline()
if tmp is not None and tmp != "":
line += tmp
if line.endswith("\n"):
yield line
line = ''
elif sleep_sec:
time.sleep(sleep_sec)
if __name__ == '__main__':
with open("test.txt", 'r') as file:
for line in follow(file):
print(line, end='')
tail -f
实例的任何答案都要好得多。已点赞。 - Bklynsh
和tail
更好 - JimmyNJif tmp is not None
更改为 if tmp != ""
很容易解决。 - personal_cloud目前唯一可移植的 tail -f
文件的方法是从文件中读取数据,并且在 read
返回 0 后经过 sleep
后再次尝试。不同平台上的 tail
实用程序使用特定于平台的技巧(例如 BSD 上的 kqueue
)来高效地追踪一个文件,而不需要使用 sleep
。
因此,纯粹使用 Python 实现良好的 tail -f
可能不是一个好主意,因为你必须使用最低公共分母的实现(不能使用特定于平台的技巧)。通过使用简单的 subprocess
打开 tail -f
并在单独的线程中迭代行,您可以轻松在Python中实现非阻塞的 tail
操作。
示例实现:
import threading, Queue, subprocess
tailq = Queue.Queue(maxsize=10) # buffer at most 100 lines
def tail_forever(fn):
p = subprocess.Popen(["tail", "-f", fn], stdout=subprocess.PIPE)
while 1:
line = p.stdout.readline()
tailq.put(line)
if not line:
break
threading.Thread(target=tail_forever, args=(fn,)).start()
print tailq.get() # blocks
print tailq.get_nowait() # throws Queue.Empty if there are no lines to read
tail
在 Linux 上使用 inotify 和 select 的组合;请参阅源代码:https://github.com/coreutils/coreutils/blob/master/src/tail.c#L1453 - nneonneotail -f
高效:它确实很高效!可以看到随着它们到达NAS的行出现。需要低延迟,这个方法起作用了。(写入来自VM,将虚拟com端口写入NAS上的原始文件,记录来自科学仪器的数据)。 - RGD2所有使用tail -f的答案都不符合Pythonic风格。
以下是Pythonic的方式:(不使用任何外部工具或库)
def follow(thefile):
while True:
line = thefile.readline()
if not line or not line.endswith('\n'):
time.sleep(0.1)
continue
yield line
if __name__ == '__main__':
logfile = open("run/foo/access-log","r")
loglines = follow(logfile)
for line in loglines:
print(line, end='')
所以,这可能来得有点晚,但我再次遇到了同样的问题,现在有一个更好的解决方案。只需使用pygtail:
Pygtail读取尚未读取的日志文件行。它甚至可以处理已经被旋转的日志文件。基于logcheck的logtail2 (http://logcheck.org)
理想情况下,我希望有类似于tail.getNewData()的方法,每次调用它都可以获取更多的数据。
我们已经有一个非常不错的方法。 每当需要更多数据时,只需调用f.read()。它将从前一次读取结束的地方开始读取,并一直读取到数据流的末尾:
f = open('somefile.log')
p = 0
while True:
f.seek(p)
latest_data = f.read()
p = f.tell()
if latest_data:
print latest_data
print str(p).center(10).center(80, '=')
要逐行读取,请使用f.readline()。有时,被读取的文件可能以部分读取的行结尾。使用f.tell()查找当前文件位置,并使用f.seek()将文件指针移回不完整行的开头来处理该情况。有关可工作代码,请参见此ActiveState配方。
str(p).center(10).center(80, '=')
中的 print 所做的事情? - openCivilisation# Get the last 3 lines of the file
tailer.tail(open('test.txt'), 3)
# ['Line 9', 'Line 10', 'Line 11']
它还可以跟踪一个文件:
# Follow the file as it grows
for line in tailer.follow(open('test.txt')):
print line
follow()
,所以对我没有起作用 :/ - Jose Albantailhead
库,它提供了tail
和head
实用程序的Python版本以及可用于自己模块的API。最初基于tailer
模块,其主要优点是能够按路径跟踪文件,即可以处理文件重新创建的情况。此外,它还解决了各种边缘情况的一些错误。import sys
from pygtail import Pygtail
for line in Pygtail("some.log"):
sys.stdout.write(line)
subprocess.call(["tail", "-F", filename])
- Whymarrhget_new_data
方法(PEP-8 命名)需要返回自上次调用以来的所有数据,还是只返回当前尾部(可能会丢失一些数据)? - Keithtail -F filename | python script.py
命令,因为我不需要 stdin 用于其他任何目的,并且这样可以获得最佳性能。 - Jasen