我有一个文本文件,每行都包含一个时间戳。我的目标是找到时间范围。所有时间都按顺序排列,因此第一行将是最早的时间,而最后一行将是最新的时间。我只需要第一行和最后一行。在Python中获取这些行的最有效方法是什么?
注意:这些文件相对较长,每个文件约有1-2百万行,我需要对几百个文件进行此操作。
from os import SEEK_END, SEEK_CUR
def readlast(f):
try:
f.seek(-2, SEEK_END) # Jump to the second last byte.
while f.read(1) != b"\n": # Until newline is found ...
f.seek(-2, SEEK_CUR) # ... jump back, over the read byte plus one.
except OSError: # Reached begginning of File
f.seek(0) # Set cursor to beginning of file as well.
return f.read() # Read all data from this point on.
with open(path, "rb") as f:
first = f.readline()
last = readlast(f)
seek
时,格式为fseek(offset, whence=0)
。from collections import deque
from os import SEEK_CUR, SEEK_END
def readlast(f, d = b'\n'):
""""readlast(f: io.IOBase, d: bytes = b'\n') -> bytes
Return the last segment of file `f`, containing data segments separated by
`d`.
"""
arr = deque(); step = 1; pos = -1
try:
# Seek to last byte of file, save it to arr as to not check for newline.
pos = f.seek(-1, SEEK_END)
arr.appendleft(f.read())
# Seek past the byte read, plus one to use as the first segment.
pos = f.seek(-2, SEEK_END)
seg = f.read(1)
# Break when 'd' occurs, store index of the rightmost match in 'i'.
while seg.rfind(d) == -1:
# Store segments with no b'\n' in a memory-efficient 'deque'.
arr.appendleft(seg)
# Step back in file, past the bytes just read plus twice that.
pos = f.seek(-step*3, SEEK_CUR)
# Read new segment, twice as big as the one read previous iteration.
step *= 2
seg = f.read(step)
# Ignore the characters up to 'i', and the triggering newline character.
arr.appendleft(seg[seg.rfind(d)+1:])
except OSError:
# Reached beginning of file. Read remaining data and check for newline.
f.seek(0)
seg = f.read(pos)
arr.appendleft(seg[seg.rfind(d)+1:])
return b"".join(arr)
f.write(b'X\nY\nZ\n'); f.seek(0)
assert readlast(f) == b'Z\n'
f.write(b'\n\n'; f.seek(0)
assert readlast(f) == b'\n'
我避免修改原始答案,因为问题明确要求效率,并且要尊重之前的赞同。
这个版本解决了多年来提出的所有评论和问题,同时保留了逻辑和向后兼容性(以可读性为代价)。
目前已经解决的问题有:
from os import SEEK_CUR, SEEK_END
def _readlast__bytes(f, sep, size, step):
# Point cursor 'size' + 'step' bytes away from the end of the file.
o = f.seek(0 - size - step, SEEK_END)
# Step 'step' bytes each iteration, halt when 'sep' occurs.
while f.read(size) != sep:
f.seek(0 - size - step, SEEK_CUR)
def _readlast__text(f, sep, size, step):
# Text mode, same principle but without the use of relative offsets.
o = f.seek(0, SEEK_END)
o = f.seek(o - size - step)
while f.read(size) != sep:
o = f.seek(o - step)
def readlast(f, sep, fixed = False):
"""readlast(f: io.BaseIO, sep: bytes|str, fixed: bool = False) -> bytes|str
Return the last segment of file `f`, containing data segments separated by
`sep`.
Set `fixed` to True when parsing UTF-32 or UTF-16 encoded data (don't forget
to pass the correct delimiter) in files opened in byte mode.
"""
size = len(sep)
step = len(sep) if (fixed is True) else (fixed or 1)
step = size if fixed else 1
if not size:
raise ValueError("Zero-length separator.")
try:
if 'b' in f.mode:
# Process file opened in byte mode.
_readlast__bytes(f, sep, size, step)
else:
# Process file opened in text mode.
_readlast__text(f, sep, size, step)
except (OSError, ValueError):
# Beginning of file reached.
f.seek(0, SEEK_SET)
return f.read()
使用方法:
f.write("X\nY\nZ\n".encode('utf32'); f.seek(0)
assert readlast(f, "\n".encode('utf32')[4:]) == "Z\n"
f.write(b'X<br>Y</br>'; f.seek(0)
assert readlast(f, b'<br>', fixed=False) == "Y</br>"
用于与此答案进行比较的代码(发布时最受赞同的答案的优化版本):
with open(file, "rb") as f:
first = f.readline() # Read and store the first line.
for last in f: pass # Read all lines, keep final value.
结果:
10k iterations processing a file of 6k lines totalling 200kB: 1.62s vs 6.92s
100 iterations processing a file of 6k lines totalling 1.3GB: 8.93s vs 86.95s
with open(fname, 'rb') as fh:
first = next(fh).decode()
fh.seek(-1024, 2)
last = fh.readlines()[-1].decode()
这里的变量值为1024:它代表平均字符串长度。我只是拿1024作为例子。如果您有平均行长度的估计值,可以使用该值乘以2。
由于您完全不知道行长度的可能上限,显而易见的解决方案是循环读取文件:
for line in fh:
pass
last = line
无需烦恼二进制标志,只需使用open(fname)
。
预计时间:由于你需要处理许多文件,因此可以使用random.sample
创建几十个文件样本,并在这些文件上运行此代码以确定最后一行的长度。使用先验的大值为位置偏移(比如1 MB),这将帮助您估算整个运行的值。
fh.seek(-1024, os.SEEK_END)
代替fh.seek(-1024, 2)
可以增加可读性。 - marslopen(fname)
即可。 使用b
标志进行打开是至关重要的。如果您使用open(fname)
而不是open(fname,'rb')
,则会出现 io.UnsupportedOperation: can't do nonzero end-relative seeks的错误。 - patryk.beza这是 SilentGhost 回答的修改版,可以实现你想要的效果。
with open(fname, 'rb') as fh:
first = next(fh)
offs = -100
while True:
fh.seek(offs, 2)
lines = fh.readlines()
if len(lines)>1:
last = lines[-1]
break
offs *= 2
print first
print last
这里不需要设置行长度的上限。
你会使用Unix命令吗?我认为使用head -1
和tail -n 1
是最有效的方法。另外,你可以使用简单的fid.readline()
来获取第一行和fid.readlines()[-1]
来获取最后一行,但这可能会消耗太多内存。
os.popen("tail -n 1 %s" % filename).read()
可以很好地获取最后一行。 - Michael Dunnos.popen("tail -n 1 %s" % filename).read()
--> 自2.6版本起已被弃用 - LarsVegas这是我的解决方案,同时兼容Python3。它也管理边界情况,但它缺少utf-16支持:
def tail(filepath):
"""
@author Marco Sulla (marcosullaroma@gmail.com)
@date May 31, 2016
"""
try:
filepath.is_file
fp = str(filepath)
except AttributeError:
fp = filepath
with open(fp, "rb") as f:
size = os.stat(fp).st_size
start_pos = 0 if size - 1 < 0 else size - 1
if start_pos != 0:
f.seek(start_pos)
char = f.read(1)
if char == b"\n":
start_pos -= 1
f.seek(start_pos)
if start_pos == 0:
f.seek(start_pos)
else:
char = ""
for pos in range(start_pos, -1, -1):
f.seek(pos)
char = f.read(1)
if char == b"\n":
break
return f.readline()
这篇文章的灵感来源于 Trasp的回答 和 AnotherParker的评论。
首先以读模式打开文件。然后使用readlines()方法逐行读取所有行并将其存储在一个列表中。现在,您可以使用列表切片来获取文件的第一行和最后一行。
a=open('file.txt','rb')
lines = a.readlines()
if lines:
first_line = lines[:1]
last_line = lines[-1]
w=open(file.txt, 'r')
print ('first line is : ',w.readline())
for line in w:
x= line
print ('last line is : ',x)
w.close()
< p > for
循环运行通过这些行,x
在最后一次迭代中获得了最后一行。
with open("myfile.txt") as f:
lines = f.readlines()
first_row = lines[0]
print first_row
last_row = lines[-1]
print last_row
f.readlines()[-1]
替代新变量。 0 = 第一行, 1 = 第二行, -1 = 最后一行, -2 = 倒数第二行... - BladeMightf=open(file,"r")
r=reversed(f.readlines())
last_line_of_file = r.next()
这是@Trasp答案的一个扩展,它具有处理仅有一行的文件的边缘情况的额外逻辑。如果您要反复读取连续更新的文件的最后一行,则处理此情况可能很有用。如果没有这个逻辑,如果您尝试抓取刚创建且仅有一行的文件的最后一行,将引发IOError: [Errno 22] Invalid argument
。
def tail(filepath):
with open(filepath, "rb") as f:
first = f.readline() # Read the first line.
f.seek(-2, 2) # Jump to the second last byte.
while f.read(1) != b"\n": # Until EOL is found...
try:
f.seek(-2, 1) # ...jump back the read byte plus one more.
except IOError:
f.seek(-1, 1)
if f.tell() == 0:
break
last = f.readline() # Read last line.
return last
File "mapper1.2.2.py", line 17, in get_last_line f.seek(-2, 2) IOError: [Errno 22] Invalid argument
- Loïcwhile f.read(1) != "\n":
应该改为while f.read(1) != b"\n":
。 - Artjom B.io.UnsupportedOperation: can't do nonzero end-relative seeks
的异常,需要分两步进行操作:首先获取文件长度,然后加上偏移量,最后将其传递给f.seek(size+offset,os.SEEK_SET)
。 - AnotherParker