读取二进制文件并循环遍历每个字节

453
在Python中,我如何读取二进制文件并循环遍历该文件的每一个字节?

逐字节读取是一次读取一个块的特殊情况,当块大小等于1时。 - Karl Knechtel
13个回答

475

Python >= 3.8

由于海象运算符 (:=)的存在,解决方案非常简短。我们从文件中读取bytes对象并将它们赋值给变量byte

with open("myfile", "rb") as f:
    while (byte := f.read(1)):
        # Do stuff with byte.

Python >= 3

在旧版的Python 3中,我们需要使用稍微冗长一些的方法:

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte != b"":
        # Do stuff with byte.
        byte = f.read(1)

正如benhoyt所说,跳过不等于并利用b""评估为false的事实。这使得代码在2.6和3.x之间兼容而无需更改。如果您从字节模式切换到文本或反之亦然,这也会避免更改条件。

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte:
        # Do stuff with byte.
        byte = f.read(1)

Python >= 2.5

在 Python 2 中,情况有些不同。这里我们得到的不是字节对象,而是原始字符:

with open("myfile", "rb") as f:
    byte = f.read(1)
    while byte != "":
        # Do stuff with byte.
        byte = f.read(1)

请注意,在Python 2.5版本以下,with语句是不可用的。要在2.5版本中使用它,您需要导入它:

from __future__ import with_statement

在2.6中不需要这个。

Python 2.4及更早版本

f = open("myfile", "rb")
try:
    byte = f.read(1)
    while byte != "":
        # Do stuff with byte.
        byte = f.read(1)
finally:
    f.close()

71
按字节逐个读取文件是性能的噩梦,在Python中一定有更好的解决方案。使用此代码需要谨慎。 - usr
15
好的,文件对象在内部进行了缓冲,即使如此,这也是所要求的。并不是每个脚本都需要最佳性能。 - Skurmedel
9
你把它从read(1)改成了read(bufsize),并且在while循环中你做了一个for-in的操作...例子仍然适用。 - Skurmedel
3
根据我尝试的代码,性能差异可能高达200倍。 - jfs
8
取决于你要处理多少字节。如果字节数不太多,性能较差但易于理解的代码可能更受欢迎。浪费的CPU周期可通过节省维护代码时所需的"读取器CPU周期"来弥补。 - IllvilJa
显示剩余8条评论

193

这个生成器从文件中读取数据块并生成字节:

def bytes_from_file(filename, chunksize=8192):
    with open(filename, "rb") as f:
        while True:
            chunk = f.read(chunksize)
            if chunk:
                for b in chunk:
                    yield b
            else:
                break

# example:
for b in bytes_from_file('filename'):
    do_stuff_with(b)

参见Python文档以获取关于迭代器生成器的信息。


3
@codeape 这正是我在寻找的。但是,如何确定chunksize?它可以是任意值吗? - swdev
3
这个例子使用了一个块大小为8192字节。文件.read()函数的参数仅指定要读取的字节数量。codeape选择了“8192字节=8 KB”(实际上是“KiB”,但这不像常见的那样)。该值是“完全”随机的,但8 KB似乎是一个合适的值:不会浪费太多内存,而且读取操作也不会像Skurmedel所接受的答案中那样“太多”。 - mozzbozz
4
文件系统已经缓存了数据块,所以这段代码是多余的。最好一次读取一个字节。 - stark
24
尽管已经比标准答案快,但通过用 yield from chunk 替换整个最内层的 for b in chunk: 循环,可以再提高20-25%的速度。Python 3.3中添加了这种形式的 yield(请参见生成表达式)。 - martineau
3
对我来说,这比被接受的答案慢。我不知道为什么。 - étale-cohomology
显示剩余7条评论

62

如果文件不太大,将其保存在内存中不是问题:

with open("filename", "rb") as f:
    bytes_read = f.read()
for b in bytes_read:
    process_byte(b)

这里的 process_byte 表示你想在传入的字节上执行的某些操作。

如果你想一次处理一块:

with open("filename", "rb") as f:
    bytes_read = f.read(CHUNKSIZE)
    while bytes_read:
        for b in bytes_read:
            process_byte(b)
        bytes_read = f.read(CHUNKSIZE)

with语句在Python 2.5及更高版本中可用。


2
你可能会对我刚刚发布的基准测试感兴趣。 - martineau

44

在Python中读取二进制文件并循环每个字节

Python 3.5中新增了pathlib模块,它具有一种方便的方法可以将文件作为字节读取,从而使我们能够迭代遍历这些字节。我认为这是一个不错的(虽然有点草率)答案:

import pathlib

for byte in pathlib.Path(path).read_bytes():
    print(byte)

有趣的是,这是唯一提到 pathlib 的答案。

在 Python 2 中,您可能会像 Vinay Sajip 建议的那样执行此操作:

with open(path, 'b') as file:
    for byte in file.read():
        print(byte)

如果文件太大,无法在内存中迭代处理,则可按惯例使用Python 2版本的iter函数和callable,sentinel签名对其进行分块处理:

with open(path, 'b') as file:
    callable = lambda: file.read(1024)
    sentinel = bytes() # or b''
    for chunk in iter(callable, sentinel): 
        for byte in chunk:
            print(byte)

(有几个回答提到了这一点,但很少提供合理的读取大小。)

大文件或缓冲/交互式读取的最佳实践

让我们创建一个函数来完成此操作,并包括Python 3.5+标准库的惯用用法:

from pathlib import Path
from functools import partial
from io import DEFAULT_BUFFER_SIZE

def file_byte_iterator(path):
    """given a path, return an iterator over the file
    that lazily loads the file
    """
    path = Path(path)
    with path.open('rb') as file:
        reader = partial(file.read1, DEFAULT_BUFFER_SIZE)
        file_iterator = iter(reader, bytes())
        for chunk in file_iterator:
            yield from chunk

请注意,我们使用file.read1file.read在获取请求的所有字节或EOF之前会阻塞。通过使用file.read1,我们可以避免阻塞,并且它可以更快地返回结果。其他答案也没有提到这一点。

最佳实践用法演示:

让我们创建一个具有一兆(实际上是兆字节)伪随机数据的文件:

import random
import pathlib
path = 'pseudorandom_bytes'
pathobj = pathlib.Path(path)

pathobj.write_bytes(
  bytes(random.randint(0, 255) for _ in range(2**20)))

现在让我们对其进行迭代并将其实例化到内存中:

>>> l = list(file_byte_iterator(path))
>>> len(l)
1048576

我们可以检查数据的任何部分,例如最后100个字节和前100个字节:

>>> l[-100:]
[208, 5, 156, 186, 58, 107, 24, 12, 75, 15, 1, 252, 216, 183, 235, 6, 136, 50, 222, 218, 7, 65, 234, 129, 240, 195, 165, 215, 245, 201, 222, 95, 87, 71, 232, 235, 36, 224, 190, 185, 12, 40, 131, 54, 79, 93, 210, 6, 154, 184, 82, 222, 80, 141, 117, 110, 254, 82, 29, 166, 91, 42, 232, 72, 231, 235, 33, 180, 238, 29, 61, 250, 38, 86, 120, 38, 49, 141, 17, 190, 191, 107, 95, 223, 222, 162, 116, 153, 232, 85, 100, 97, 41, 61, 219, 233, 237, 55, 246, 181]
>>> l[:100]
[28, 172, 79, 126, 36, 99, 103, 191, 146, 225, 24, 48, 113, 187, 48, 185, 31, 142, 216, 187, 27, 146, 215, 61, 111, 218, 171, 4, 160, 250, 110, 51, 128, 106, 3, 10, 116, 123, 128, 31, 73, 152, 58, 49, 184, 223, 17, 176, 166, 195, 6, 35, 206, 206, 39, 231, 89, 249, 21, 112, 168, 4, 88, 169, 215, 132, 255, 168, 129, 127, 60, 252, 244, 160, 80, 155, 246, 147, 234, 227, 157, 137, 101, 84, 115, 103, 77, 44, 84, 134, 140, 77, 224, 176, 242, 254, 171, 115, 193, 29]

不要对二进制文件逐行迭代

不要这样做-它会读取任意大小的数据块,直到遇到换行符-当数据块太小或太大时速度都会变慢:

    with open(path, 'rb') as file:
        for chunk in file: # text newline iteration - not for bytes
            yield from chunk

上述内容仅适用于语义上可读的文本文件(如纯文本、代码、标记、Markdown等... 基本上是任何ASCII、UTF、Latin等编码),您应该在不使用'b'标志的情况下打开它们。


4
这样好多了...谢谢你的做到这一点。我知道回到两年前的答案可能并不总是有趣,但我很感激你这么做了。我特别喜欢“不要逐行迭代”这个小标题 :-) - Floris
1
嗨,亚伦,你选择使用 path = Path(path), with path.open('rb') as file: 而不是使用内置的 open 函数,有什么原因吗?它们都可以做同样的事情,对吧? - Joshua Yonathan
2
@JoshuaYonathan 我使用 Path 对象,因为它是一个非常方便的新路径处理方式。我们不再需要将字符串传递到仔细选择的“正确”函数中,而是可以直接在路径对象上调用方法,这些方法基本上包含了您希望在语义上的路径字符串中拥有的大部分重要功能。通过能够检查的IDE,我们也可以更轻松地获取自动完成。虽然使用内置的 open 函数也可以实现同样的效果,但当编写程序供程序员使用时,使用 Path 对象具有许多优势。 - Russia Must Remove Putin
1
你提到的最后一种方法使用函数file_byte_iterator比我在这个页面上尝试过的所有方法都要快得多。向你致敬! - Rick M.
@RickM:你可能会对我刚刚发布的基准测试感兴趣。 - martineau

42

如果你想以每次读取一个字节的方式读取文件(忽略缓冲),可以使用带有两个参数的iter(callable, sentinel)内置函数

with open(filename, 'rb') as file:
    for byte in iter(lambda: file.read(1), b''):
        # Do stuff with byte

它调用file.read(1)直到返回空的字节串b''(即空的bytestring)。对于大文件,内存不会无限增长。您可以将buffering=0传递给open()以禁用缓冲——它保证每次迭代只读取一个字节(速度较慢)。

with语句会自动关闭文件——包括代码引发异常的情况。

尽管默认情况下存在内部缓冲,但一次处理一个字节仍然效率低下。例如,这是一个名为blackhole.py的实用程序,它吞噬掉它所接收到的所有内容:

#!/usr/bin/env python3
"""Discard all input. `cat > /dev/null` analog."""
import sys
from functools import partial
from collections import deque

chunksize = int(sys.argv[1]) if len(sys.argv) > 1 else (1 << 15)
deque(iter(partial(sys.stdin.detach().read, chunksize), b''), maxlen=0)

例子:

$ dd if=/dev/zero bs=1M count=1000 | python3 blackhole.py

当我的机器上 chunksize == 32768 时,处理速度为每秒约 1.5 GB;而当 chunksize == 1 时,处理速度只有每秒约 7.5 MB。也就是说,逐字节读取比一次读取多个字节慢了200倍。如果您需要提高性能,请考虑重写处理方式,使用一次读取多个字节。

mmap 允许您将文件同时作为 bytearray 和文件对象处理。如果您需要同时访问这两种接口,则可以将其用作不将整个文件加载到内存中的替代方法。特别地,您可以使用普通的 for 循环,一次读取一个字节地迭代遍历映射到内存中的文件:

from mmap import ACCESS_READ, mmap

with open(filename, 'rb', 0) as f, mmap(f.fileno(), 0, access=ACCESS_READ) as s:
    for byte in s: # length is equal to the current file size
        # Do stuff with byte

mmap 支持切片操作。例如,mm[i:i+len] 从文件的位置 i 开始返回 len 字节。在 Python 3.2 之前不支持上下文管理器协议;在这种情况下,您需要显式调用 mm.close()。使用 mmap 按字节迭代消耗的内存比使用 file.read(1) 更多,但是 mmap 的速度比后者快一个数量级。


我觉得最后一个例子非常有趣。可惜没有与 numpy 内存映射(字节)数组相当的东西。 - martineau
2
@martineau,有numpy.memmap()函数,您可以使用(ctypes.data)每次获取一个字节的数据。您可以将numpy数组视为内存中的blob +元数据。 - jfs
jfs:谢谢,好消息!我不知道有这样的东西存在。顺便说一下,很棒的回答。 - martineau

22

总结Chrispy、Skurmedel、Ben Hoyt和Peter Hansen提出的所有优点,这将是每次以一个字节为单位处理二进制文件的最佳解决方案:

with open("myfile", "rb") as f:
    while True:
        byte = f.read(1)
        if not byte:
            break
        do_stuff_with(ord(byte))

对于Python 2.6及以上版本,原因是:

  • Python内部进行缓冲 - 无需读取分块
  • DRY原则 - 不要重复读取行
  • 使用with语句确保文件关闭干净
  • 'byte'在没有更多字节时评估为false(而不是当一个字节为零时)

或者使用J.F.Sebastian的解决方案以提高速度。

from functools import partial

with open(filename, 'rb') as file:
    for byte in iter(partial(file.read, 1), b''):
        # Do stuff with byte

或者,如果你希望将其作为一个生成器函数,就像codeape所演示的那样:

def bytes_from_file(filename):
    with open(filename, "rb") as f:
        while True:
            byte = f.read(1)
            if not byte:
                break
            yield(ord(byte))

# example:
for b in bytes_from_file('filename'):
    do_stuff_with(b)

2
正如链接的回答所说,即使读取是缓冲的,Python 逐字节读取/处理仍然很慢。如果能像链接答案中的示例一样同时处理几个字节,则性能可以大大提高:1.5GB/s vs. 7.5MB/s。 - jfs

19

这篇文章本身并不是直接回答这个问题的。相反,它是一个数据驱动的可扩展基准测试,可以用来比较许多已经发布在这个问题中的答案(以及后来添加的更现代化的Python版本中使用新特性的变化),因此应该有助于确定哪个性能最好。

在一些情况下,我修改了引用答案中的代码以使其与基准测试框架兼容。

首先,这里是目前的Python 2和3的最新版本的结果:

Fastest to slowest execution speeds with 32-bit Python 2.7.16
  numpy version 1.16.5
  Test file size: 1,024 KiB
  100 executions, best of 3 repetitions

1                  Tcll (array.array) :   3.8943 secs, rel speed   1.00x,   0.00% slower (262.95 KiB/sec)
2  Vinay Sajip (read all into memory) :   4.1164 secs, rel speed   1.06x,   5.71% slower (248.76 KiB/sec)
3            codeape + iter + partial :   4.1616 secs, rel speed   1.07x,   6.87% slower (246.06 KiB/sec)
4                             codeape :   4.1889 secs, rel speed   1.08x,   7.57% slower (244.46 KiB/sec)
5               Vinay Sajip (chunked) :   4.1977 secs, rel speed   1.08x,   7.79% slower (243.94 KiB/sec)
6           Aaron Hall (Py 2 version) :   4.2417 secs, rel speed   1.09x,   8.92% slower (241.41 KiB/sec)
7                     gerrit (struct) :   4.2561 secs, rel speed   1.09x,   9.29% slower (240.59 KiB/sec)
8                     Rick M. (numpy) :   8.1398 secs, rel speed   2.09x, 109.02% slower (125.80 KiB/sec)
9                           Skurmedel :  31.3264 secs, rel speed   8.04x, 704.42% slower ( 32.69 KiB/sec)

Benchmark runtime (min:sec) - 03:26

Fastest to slowest execution speeds with 32-bit Python 3.8.0
  numpy version 1.17.4
  Test file size: 1,024 KiB
  100 executions, best of 3 repetitions

1  Vinay Sajip + "yield from" + "walrus operator" :   3.5235 secs, rel speed   1.00x,   0.00% slower (290.62 KiB/sec)
2                       Aaron Hall + "yield from" :   3.5284 secs, rel speed   1.00x,   0.14% slower (290.22 KiB/sec)
3         codeape + iter + partial + "yield from" :   3.5303 secs, rel speed   1.00x,   0.19% slower (290.06 KiB/sec)
4                      Vinay Sajip + "yield from" :   3.5312 secs, rel speed   1.00x,   0.22% slower (289.99 KiB/sec)
5      codeape + "yield from" + "walrus operator" :   3.5370 secs, rel speed   1.00x,   0.38% slower (289.51 KiB/sec)
6                          codeape + "yield from" :   3.5390 secs, rel speed   1.00x,   0.44% slower (289.35 KiB/sec)
7                                      jfs (mmap) :   4.0612 secs, rel speed   1.15x,  15.26% slower (252.14 KiB/sec)
8              Vinay Sajip (read all into memory) :   4.5948 secs, rel speed   1.30x,  30.40% slower (222.86 KiB/sec)
9                        codeape + iter + partial :   4.5994 secs, rel speed   1.31x,  30.54% slower (222.64 KiB/sec)
10                                        codeape :   4.5995 secs, rel speed   1.31x,  30.54% slower (222.63 KiB/sec)
11                          Vinay Sajip (chunked) :   4.6110 secs, rel speed   1.31x,  30.87% slower (222.08 KiB/sec)
12                      Aaron Hall (Py 2 version) :   4.6292 secs, rel speed   1.31x,  31.38% slower (221.20 KiB/sec)
13                             Tcll (array.array) :   4.8627 secs, rel speed   1.38x,  38.01% slower (210.58 KiB/sec)
14                                gerrit (struct) :   5.0816 secs, rel speed   1.44x,  44.22% slower (201.51 KiB/sec)
15                 Rick M. (numpy) + "yield from" :  11.8084 secs, rel speed   3.35x, 235.13% slower ( 86.72 KiB/sec)
16                                      Skurmedel :  11.8806 secs, rel speed   3.37x, 237.18% slower ( 86.19 KiB/sec)
17                                Rick M. (numpy) :  13.3860 secs, rel speed   3.80x, 279.91% slower ( 76.50 KiB/sec)

Benchmark runtime (min:sec) - 04:47

我还使用了一个更大的10 MiB测试文件进行了运行(运行时间接近一小时),得到的性能结果与上面显示的结果相当。

以下是用于进行基准测试的代码:

from __future__ import print_function
import array
import atexit
from collections import deque, namedtuple
import io
from mmap import ACCESS_READ, mmap
import numpy as np
from operator import attrgetter
import os
import random
import struct
import sys
import tempfile
from textwrap import dedent
import time
import timeit
import traceback

try:
    xrange
except NameError:  # Python 3
    xrange = range


class KiB(int):
    """ KibiBytes - multiples of the byte units for quantities of information. """
    def __new__(self, value=0):
        return 1024*value


BIG_TEST_FILE = 1  # MiBs or 0 for a small file.
SML_TEST_FILE = KiB(64)
EXECUTIONS = 100  # Number of times each "algorithm" is executed per timing run.
TIMINGS = 3  # Number of timing runs.
CHUNK_SIZE = KiB(8)
if BIG_TEST_FILE:
    FILE_SIZE = KiB(1024) * BIG_TEST_FILE
else:
    FILE_SIZE = SML_TEST_FILE  # For quicker testing.

# Common setup for all algorithms -- prefixed to each algorithm's setup.
COMMON_SETUP = dedent("""
    # Make accessible in algorithms.
    from __main__ import array, deque, get_buffer_size, mmap, np, struct
    from __main__ import ACCESS_READ, CHUNK_SIZE, FILE_SIZE, TEMP_FILENAME
    from functools import partial
    try:
        xrange
    except NameError:  # Python 3
        xrange = range
""")


def get_buffer_size(path):
    """ Determine optimal buffer size for reading files. """
    st = os.stat(path)
    try:
        bufsize = st.st_blksize # Available on some Unix systems (like Linux)
    except AttributeError:
        bufsize = io.DEFAULT_BUFFER_SIZE
    return bufsize

# Utility primarily for use when embedding additional algorithms into benchmark.
VERIFY_NUM_READ = """
    # Verify generator reads correct number of bytes (assumes values are correct).
    bytes_read = sum(1 for _ in file_byte_iterator(TEMP_FILENAME))
    assert bytes_read == FILE_SIZE, \
           'Wrong number of bytes generated: got {:,} instead of {:,}'.format(
                bytes_read, FILE_SIZE)
"""

TIMING = namedtuple('TIMING', 'label, exec_time')

class Algorithm(namedtuple('CodeFragments', 'setup, test')):

    # Default timeit "stmt" code fragment.
    _TEST = """
        #for b in file_byte_iterator(TEMP_FILENAME):  # Loop over every byte.
        #    pass  # Do stuff with byte...
        deque(file_byte_iterator(TEMP_FILENAME), maxlen=0)  # Data sink.
    """

    # Must overload __new__ because (named)tuples are immutable.
    def __new__(cls, setup, test=None):
        """ Dedent (unindent) code fragment string arguments.
        Args:
          `setup` -- Code fragment that defines things used by `test` code.
                     In this case it should define a generator function named
                     `file_byte_iterator()` that will be passed that name of a test file
                     of binary data. This code is not timed.
          `test` -- Code fragment that uses things defined in `setup` code.
                    Defaults to _TEST. This is the code that's timed.
        """
        test =  cls._TEST if test is None else test  # Use default unless one is provided.

        # Uncomment to replace all performance tests with one that verifies the correct
        # number of bytes values are being generated by the file_byte_iterator function.
        #test = VERIFY_NUM_READ

        return tuple.__new__(cls, (dedent(setup), dedent(test)))


algorithms = {

    'Aaron Hall (Py 2 version)': Algorithm("""
        def file_byte_iterator(path):
            with open(path, "rb") as file:
                callable = partial(file.read, 1024)
                sentinel = bytes() # or b''
                for chunk in iter(callable, sentinel):
                    for byte in chunk:
                        yield byte
    """),

    "codeape": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                while True:
                    chunk = f.read(chunksize)
                    if chunk:
                        for b in chunk:
                            yield b
                    else:
                        break
    """),

    "codeape + iter + partial": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                for chunk in iter(partial(f.read, chunksize), b''):
                    for b in chunk:
                        yield b
    """),

    "gerrit (struct)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                fmt = '{}B'.format(FILE_SIZE)  # Reads entire file at once.
                for b in struct.unpack(fmt, f.read()):
                    yield b
    """),

    'Rick M. (numpy)': Algorithm("""
        def file_byte_iterator(filename):
            for byte in np.fromfile(filename, 'u1'):
                yield byte
    """),

    "Skurmedel": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                byte = f.read(1)
                while byte:
                    yield byte
                    byte = f.read(1)
    """),

    "Tcll (array.array)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                arr = array.array('B')
                arr.fromfile(f, FILE_SIZE)  # Reads entire file at once.
                for b in arr:
                    yield b
    """),

    "Vinay Sajip (read all into memory)": Algorithm("""
        def file_byte_iterator(filename):
            with open(filename, "rb") as f:
                bytes_read = f.read()  # Reads entire file at once.
            for b in bytes_read:
                yield b
    """),

    "Vinay Sajip (chunked)": Algorithm("""
        def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
            with open(filename, "rb") as f:
                chunk = f.read(chunksize)
                while chunk:
                    for b in chunk:
                        yield b
                    chunk = f.read(chunksize)
    """),

}  # End algorithms

#
# Versions of algorithms that will only work in certain releases (or better) of Python.
#
if sys.version_info >= (3, 3):
    algorithms.update({

        'codeape + iter + partial + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    for chunk in iter(partial(f.read, chunksize), b''):
                        yield from chunk
        """),

        'codeape + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while True:
                        chunk = f.read(chunksize)
                        if chunk:
                            yield from chunk
                        else:
                            break
        """),

        "jfs (mmap)": Algorithm("""
            def file_byte_iterator(filename):
                with open(filename, "rb") as f, \
                     mmap(f.fileno(), 0, access=ACCESS_READ) as s:
                    yield from s
        """),

        'Rick M. (numpy) + "yield from"': Algorithm("""
            def file_byte_iterator(filename):
            #    data = np.fromfile(filename, 'u1')
                yield from np.fromfile(filename, 'u1')
        """),

        'Vinay Sajip + "yield from"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    chunk = f.read(chunksize)
                    while chunk:
                        yield from chunk  # Added in Py 3.3
                        chunk = f.read(chunksize)
        """),

    })  # End Python 3.3 update.

if sys.version_info >= (3, 5):
    algorithms.update({

        'Aaron Hall + "yield from"': Algorithm("""
            from pathlib import Path

            def file_byte_iterator(path):
                ''' Given a path, return an iterator over the file
                    that lazily loads the file.
                '''
                path = Path(path)
                bufsize = get_buffer_size(path)

                with path.open('rb') as file:
                    reader = partial(file.read1, bufsize)
                    for chunk in iter(reader, bytes()):
                        yield from chunk
        """),

    })  # End Python 3.5 update.

if sys.version_info >= (3, 8, 0):
    algorithms.update({

        'Vinay Sajip + "yield from" + "walrus operator"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while chunk := f.read(chunksize):
                        yield from chunk  # Added in Py 3.3
        """),

        'codeape + "yield from" + "walrus operator"': Algorithm("""
            def file_byte_iterator(filename, chunksize=CHUNK_SIZE):
                with open(filename, "rb") as f:
                    while chunk := f.read(chunksize):
                        yield from chunk
        """),

    })  # End Python 3.8.0 update.update.


#### Main ####

def main():
    global TEMP_FILENAME

    def cleanup():
        """ Clean up after testing is completed. """
        try:
            os.remove(TEMP_FILENAME)  # Delete the temporary file.
        except Exception:
            pass

    atexit.register(cleanup)

    # Create a named temporary binary file of pseudo-random bytes for testing.
    fd, TEMP_FILENAME = tempfile.mkstemp('.bin')
    with os.fdopen(fd, 'wb') as file:
         os.write(fd, bytearray(random.randrange(256) for _ in range(FILE_SIZE)))

    # Execute and time each algorithm, gather results.
    start_time = time.time()  # To determine how long testing itself takes.

    timings = []
    for label in algorithms:
        try:
            timing = TIMING(label,
                            min(timeit.repeat(algorithms[label].test,
                                              setup=COMMON_SETUP + algorithms[label].setup,
                                              repeat=TIMINGS, number=EXECUTIONS)))
        except Exception as exc:
            print('{} occurred timing the algorithm: "{}"\n  {}'.format(
                    type(exc).__name__, label, exc))
            traceback.print_exc(file=sys.stdout)  # Redirect to stdout.
            sys.exit(1)
        timings.append(timing)

    # Report results.
    print('Fastest to slowest execution speeds with {}-bit Python {}.{}.{}'.format(
            64 if sys.maxsize > 2**32 else 32, *sys.version_info[:3]))
    print('  numpy version {}'.format(np.version.full_version))
    print('  Test file size: {:,} KiB'.format(FILE_SIZE // KiB(1)))
    print('  {:,d} executions, best of {:d} repetitions'.format(EXECUTIONS, TIMINGS))
    print()

    longest = max(len(timing.label) for timing in timings)  # Len of longest identifier.
    ranked = sorted(timings, key=attrgetter('exec_time')) # Sort so fastest is first.
    fastest = ranked[0].exec_time
    for rank, timing in enumerate(ranked, 1):
        print('{:<2d} {:>{width}} : {:8.4f} secs, rel speed {:6.2f}x, {:6.2f}% slower '
              '({:6.2f} KiB/sec)'.format(
                    rank,
                    timing.label, timing.exec_time, round(timing.exec_time/fastest, 2),
                    round((timing.exec_time/fastest - 1) * 100, 2),
                    (FILE_SIZE/timing.exec_time) / KiB(1),  # per sec.
                    width=longest))
    print()
    mins, secs = divmod(time.time()-start_time, 60)
    print('Benchmark runtime (min:sec) - {:02d}:{:02d}'.format(int(mins),
                                                               int(round(secs))))

main()

你是在假设我使用 yield from chunk 而不是 for byte in chunk: yield byte 吗?我想我应该在回答中更加明确。 - Russia Must Remove Putin
@Aaron:在Python 3的结果中,你的答案有两个版本,其中一个使用了yield from - martineau
好的,我已经更新了我的答案。另外,我建议你放弃使用enumerate,因为迭代应该被理解为完成 - 如果不是的话,据我所知,使用enumerate会带来一些额外的开销,而使用+=1来进行索引的记录可能更加高效,所以你可以在自己的代码中进行索引的记录。或者甚至可以传递到一个maxlen=0的deque中。 - Russia Must Remove Putin
1
更多的代码审查:我认为现在写Python 2的答案没有任何意义 - 我会考虑删除Python 2,因为我希望您使用64位的Python 3.7或3.8。您可以使用atexit和部分应用程序将清理设置为在最后进行。错别字:“veriify”。我看不出测试字符串重复的意义 - 它们是否完全不同?我想象如果您在__new__中使用super().而不是tuple.,则可以使用namedtuple属性名称而不是索引。 - Russia Must Remove Putin
@Aaron:会考虑你的建议。Python 2的问题主要是出于历史原因,这个问题已经十年多了(而且我认为它并不那么分散注意力)。你提出的其他大部分想法听起来都很有意思,我会自己尝试一下,充实自己——但坦白说,优化基准测试并不是我的首要任务。就我而言,它已经发挥了它的存在价值,尽管我总是乐于学习如何改进代码并使其更具可重用性。无论如何,感谢你的建设性批评。 - martineau
显示剩余2条评论

7

Python 3,一次性读取整个文件:

with open("filename", "rb") as binary_file:
    # Read the whole file at once
    data = binary_file.read()
    print(data)

您可以使用“data”变量迭代任何您想要的内容。

6

尝试了以上所有方法并使用@Aaron Hall的答案后,我在运行Windows 10,8 Gb RAM和Python 3.5 32位的计算机上处理一个约90 Mb文件时出现内存错误。我的同事建议我改用numpy,结果非常好。

到目前为止,我测试过的读取整个二进制文件最快的方法是:

import numpy as np

file = "binary_file.bin"
data = np.fromfile(file, 'u1')

参考资料

比目前任何其他方法都快得多。希望能对某些人有所帮助!


@Nirmal:这个问题是关于循环遍历每个字节的,所以你关于不同数据类型的评论是否有任何影响并不清楚。 - martineau
1
Rick:你的代码和其他人的不太一样——即没有循环每个字节。如果加上这个,至少根据我的基准测试结果,它并没有比大多数其他方法更快。实际上,它似乎是较慢的方法之一。如果对每个字节进行的处理(无论是什么)可以通过numpy完成,那么可能是值得的。 - martineau
@martineau 感谢您的评论,是的,我确实理解这个问题是关于循环遍历每个字节而不仅仅是一次性加载所有内容,但是在这个问题中还有其他答案也指向了读取所有内容,因此我的回答。 - Rick M.
@Nirmal 你也错了。numpy从文件中读取不同类型的数据可以使用dtypes:===================================dtheader= np.dtype([('Start Name','b', (4,)), ('Message Type', np.int32, (1,)), ('Instance', np.int32, (1,)), ('NumItems', np.int32, (1,)), ('Length', np.int32, (1,)), ('ComplexArray', np.int32, (1,))]) dtheader=dtheader.newbyteorder('>')headerinfo = np.fromfile(iqfile, dtype=dtheader, count=1) - Kurt Peters
@KurtPeters哦,我不知道你可以传递自定义的dtype。谢谢! - Nirmal

4
如果你需要读取大量二进制数据,你可能需要考虑使用struct模块。它被记录为“在C和Python类型之间进行转换”,但当然,字节就是字节,无论它们是以C类型创建的还是其他任何类型都没有关系。例如,如果你的二进制数据包含两个2字节整数和一个4字节整数,你可以按照以下方式读取它们(例子来自struct文档):
>>> struct.unpack('hhl', b'\x00\x01\x00\x02\x00\x00\x00\x03')
(1, 2, 3)

您可能会发现这比显式循环文件内容更加方便、快速,或者两者兼备。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接