将二进制文件读入结构体中

9

我有一个已知格式/结构的二进制文件。

如何将所有二进制数据读入到一个该结构体的数组中?

类似于以下伪代码:

bytes = read_file(filename)
struct = {'int','int','float','byte[255]'}
data = read_as_struct(bytes, struct)
data[1]
>>> 10,11,10.1,Arr[255]

我的解决方案目前是:

data = []

fmt   = '=iiiii256i'
fmt_s = '=iiiii'
fmt_spec = '256i'

struct_size = struct.calcsize(fmt)

for i in range(struct_size, len(bytes)-struct_size, struct_size):
    dat1= list(struct.unpack(fmt_s, bytes[i-struct_size:i-1024]))
    dat2= list(struct.unpack(fmt_spec, bytes[i-1024:i]))
    dat1.append(dat2)
    data.append(dat1)

你更新中的spectrum是什么意思?你是不是指的是dat2 - Martijn Pieters
是的。已经修改以反映这一点。 - kasperhj
1
请注意,您的代码实际上跳过了最后的 struct_size 字节。 - Martijn Pieters
我喜欢使用Construct来处理这类事情。它可以让你在Python中轻松定义和读取复杂的结构。 - kichik
read() 方法从文件中返回指定数量的字节。默认值为 -1,表示整个文件。 - pippo1980
4个回答

29
实际上,看起来您正在尝试从文件中读取结构的列表(或数组)。在Python中惯用的方法是使用struct模块,并在循环中调用struct.unpack() - 如果您事先知道它们的数量,则可以固定次数调用,否则直到达到文件末尾为止 - 并将结果存储在list中。以下是后者的示例:
import struct

struct_fmt = '=5if255s' # int[5], float, byte[255]
struct_len = struct.calcsize(struct_fmt)
struct_unpack = struct.Struct(struct_fmt).unpack_from

results = []
with open(filename, "rb") as f:
    while True:
        data = f.read(struct_len)
        if not data: break
        s = struct_unpack(data)
        results.append(s)

同样的结果也可以通过结合一个短小的生成器函数辅助工具(即下面的read_chunks())和列表推导式稍微更加简洁地获得:

def read_chunks(f, length):
    while True:
        data = f.read(length)
        if not data: break
        yield data

with open(filename, "rb") as f:
    results = [struct_unpack(chunk) for chunk in read_chunks(f, struct_len)]

更新

实际上,您不需要像上面所示那样显式定义辅助函数,因为您可以使用Python内置的iter()函数在列表推导式中动态创建所需的iterator对象,如下所示:

from functools import partial

with open(filename, "rb") as f:
    results = [struct_unpack(chunk) for chunk in iter(partial(f.read, struct_len), b'')]

一些读者可能也会对阅读问题 Fastest way to read a binary file with a defined format? 的答案感兴趣。 - martineau
当我尝试使用您的建议时,出现了以下错误: struct.error: unpack_from需要至少209字节的缓冲区 我做错了什么?抱歉,我是Python新手。 - xMutzelx
1
@xMutzelx:当二进制文件的长度不是结构体大小的精确倍数时,就会发生这种情况,因为在f.read()调用之后没有进行检查以确保返回了请求的字节数。这可能是因为文件中还有一些标题或尾随信息,以及组成结构体数组的数据(或者文件已损坏)。 - martineau
1
@xMutzelx:我刚刚测试了答案中的代码,当我向用于测试的二进制输入文件添加一个小的(10字节)头时,确实出现了你描述的错误。 - martineau
我已经解决了这个问题,感谢你的帮助。我的结构体包含了“Q50IB”。这个1字节(“B”)被填充到4字节。我只需要将“B”改为“I”即可。 - xMutzelx
1
@xMutzelx:嗯...是的,错误的结构布局也可能会引起问题。因此,手动验证计算出的“struct_len”值是否正确可能是一个好主意。另外,请注意,将结构格式字符串前缀从“'='”更改为“'@'”(并将末尾的“'B'”保留)也可能有效,因为它启用了“本地”对齐而不是禁止它。 - martineau

15

使用struct模块;您需要在字符串格式中定义在该库中记录的类型:

struct.unpack('=HHf255s', bytes)

上面的例子期望本机字节顺序、两个无符号short整数、一个浮点数和一个255个字符的字符串。

要循环遍历已经完全读取的bytes字符串,我会使用itertools;这里有一个方便的grouper recipe,我在这里进行了修改:

from itertools import izip_longest, imap
from struct import unpack, calcsize

fmt_s = '=5i'
fmt_spec = '=256i'
size_s = calcsize(fmt_s)
size = size_s + calcsize(fmt_spec)

def chunked(iterable, n, fillvalue=''):
    args = [iter(iterable)] * n
    return imap(''.join, izip_longest(*args, fillvalue=fillvalue))

data = [unpack(fmt_s, section[:size_s]) + (unpack(fmt_spec, section[size_s:]),)
    for section in chunked(bytes, size)]
    

如果必须这样做,那么它将产生元组而不是列表,但很容易进行调整:

data = [list(unpack(fmt_s, section[:size_s])) + [list(unpack(fmt_spec, section[size_s:]))]
    for section in chunked(bytes, size)]

似乎这在大于结构体大小的数据上不起作用。我的二进制数据重复出现。 - kasperhj
1
@lejon:它不会神奇地从“bytes”中删除读取的数据。要么切片“buffer”,要么使用“unpack_from()”和偏移量。 - Martijn Pieters
那么我需要遍历bytes并将每个unpack分配给一个元组吗?另外,有没有一种方法可以将最后的255放入实际数组中,以便输出的形式与我在原始帖子中输入的形式相同? - kasperhj
@lejon:Arr 是什么类型?你必须手动将结果字符串传递给该类型,struct 无法为你完成这个操作。 - Martijn Pieters
这是一个字节数组。我已经编辑了答案以展示我的解决方案。不过,是否有列表推导式可以完成相同的任务? - kasperhj
显示剩余2条评论

2

添加评论

import struct 

首先将二进制读入数组中

mbr = file('mbrcontent', 'rb').read() 

所以你可以只获取数组的某个部分

partition_table = mbr[446:510] 

然后将其解压为整数

signature = struct.unpack('<H', mbr[510:512])[0] 

一个更复杂的例子
little_endian = (signature == 0xaa55) # should be True 
print "Little endian:", little_endian 
PART_FMT = (little_endian and '<' or '>') + ( 
"B" # status (0x80 = bootable (active), 0x00 = non-bootable) 
# CHS of first block 
"B" # Head 
"B" # Sector is in bits 5; bits 9 of cylinder are in bits 7-6 
"B" # bits 7-0 of cylinder 
"B" # partition type 
# CHS of last block 
"B" # Head 
"B" # Sector is in bits 5; bits 9 of cylinder are in bits 7-6 
"B" # bits 7-0 of cylinder 
"L" # LBA of first sector in the partition 
"L" # number of blocks in partition, in little-endian format 
) 

PART_SIZE = 16 
fmt_size = struct.calcsize(PART_FMT) 
# sanity check expectations 
assert fmt_size == PART_SIZE, "Partition format string is %i bytes, not %i" % (fmt_size, PART_SIZE) 

def cyl_sector(sector_cyl, cylinder7_0): 
    sector = sector_cyl & 0x1F # bits 5-0 

    # bits 7-6 of sector_cyl contain bits 9-8 of the cylinder 
    cyl_high = (sector_cyl >> 5) & 0x03 
    cyl = (cyl_high << 8) | cylinder7_0 
    return sector, cyl 

#I have corrected the indentation, but the change is refused because less than 6 characters, so I am adding this useful comment.
for partition in range(4): 
    print "Partition #%i" % partition, 
    offset = PART_SIZE * partition 
    (status, start_head, start_sector_cyl, start_cyl7_0, part_type, end_head, end_sector_cyl, end_cyl7_0, 
    lba, blocks ) = struct.unpack( PART_FMT,partition_table[offset:offset + PART_SIZE]) 
    if status == 0x80: 
        print "Bootable", 
    elif status: 
        print "Unknown status [%s]" % hex(status), 
        print "Type=0x%x" % part_type 
        start = (start_head,) + cyl_sector(start_sector_cyl, start_cyl7_0) 
        end = (end_head,) + cyl_sector(end_sector_cyl, end_cyl7_0) 
        print " (Start: Heads:%i\tCyl:%i\tSect:%i)" % start 
        print " (End: Heads:%i\tCyl:%i\tSect:%i)" % end 
        print " LBA:", lba 
        print " Blocks:", blocks 

1
-1:请修复缩进并解释这段代码演示了什么。 - Blair

2
import os, re
import functools
import ctypes
from ctypes import string_at, byref, sizeof, cast, POINTER, pointer, create_string_buffer, memmove
import numpy as np
import pandas as pd

class _StructBase(ctypes.Structure):
    __type__ = 0
    _fields_ = []

    @classmethod
    def Offsetof(cls, field):
        pattern = '(?P<field>\w+)\[(?P<idx>\d+)\]'

        mat = re.match(pattern, field)
        if mat:
            fields = dict(cls.Fields())
            f = mat.groupdict()['field']
            idx = mat.groupdict()['idx']
            return cls.Offsetof(f) + int(idx) * ctypes.sizeof(fields[field])
        else:
            return getattr(cls, field).offset

    @classmethod
    def DType(cls):
        map = {
            ctypes.c_byte: np.byte,
            ctypes.c_ubyte: np.ubyte,
            ctypes.c_char: np.ubyte,

            ctypes.c_int8: np.int8,
            ctypes.c_int16: np.int16,
            ctypes.c_int32: np.int32,
            ctypes.c_int64: np.int64,

            ctypes.c_uint8: np.uint8,
            ctypes.c_uint16: np.uint16,
            ctypes.c_uint32: np.uint32,
            ctypes.c_uint64: np.uint64,

            ctypes.c_float: np.float32,
            ctypes.c_double: np.float64,
        }
        res = []

        for k, v in cls.Fields():
            if hasattr(v, '_length_'):
                if v._type_ != ctypes.c_char:
                    for i in range(v._length):
                        res.append((k, map[v], cls.Offsetof(k)))
                else:
                    res.append((k, 'S%d' % v._length_, cls.Offsetof(k)))
            else:
                res.append((k, map[v], cls.Offsetof(k)))
        res = pd.DataFrame(res, columns=['name', 'format', 'offset'])
        return np.dtype({
            'names': res['name'],
            'formats': res['format'],
            'offsets': res['offset'],
        })

    @classmethod
    def Attr(cls):
        fields = cls._fields_
        res = []
        for attr, tp in fields:
            if str(tp).find('_Array_') > 0 and str(tp).find('char_Array_') < 0:
                for i in range(tp._length_):
                    res.append((attr + '[%s]' % str(i), tp._type_))
            else:
                res.append((attr, tp))
        return res

    @classmethod
    def Fields(cls, notype=False):
        res = [cls.Attr()]
        cur_cls = cls
        while True:
            cur_cls = cur_cls.__bases__[0]
            if cur_cls == ctypes.Structure:
                break
            res.append(cur_cls.Attr())
        if notype:
            return [k for k, v in functools.reduce(list.__add__, reversed(res), [])]
        else:
            return functools.reduce(list.__add__, reversed(res), [])

    @classmethod
    def size(cls):
        return sizeof(cls)

    @classmethod
    def from_struct_binary(cls, path, max_count=2 ** 32, decode=True):
        print(os.path.getsize(path), cls.size())
        assert os.path.getsize(path) % cls.size() == 0
        size = os.path.getsize(path) // cls.size()
        size = min(size, max_count)

        index = range(size)
        array = np.fromfile(path, dtype=cls.DType(), count=size)

        df = pd.DataFrame(array, index=index)
        for attr, tp in eval(str(cls.DType())):
            if re.match('S\d+', tp) is not None and decode:
                try:
                    df[attr] = df[attr].map(lambda x: x.decode("utf-8"))
                except:
                    df[attr] = df[attr].map(lambda x: x.decode("gbk"))
        return df

class StructBase(_StructBase):
    _fields_ = [
        ('Type', ctypes.c_uint32),
    ]

class IndexStruct(StructBase):
    _fields_ = [
        ('Seq', ctypes.c_uint32),
        ('ExID', ctypes.c_char * 8),
        ('SecID', ctypes.c_char * 8),
        ('SecName', ctypes.c_char * 16),
        ('SourceID', ctypes.c_int32),
        ('Time', ctypes.c_uint32),
        ('PreClose', ctypes.c_uint32),
        ('Open', ctypes.c_uint32),
        ('High', ctypes.c_uint32),
        ('Low', ctypes.c_uint32),
        ('Match', ctypes.c_uint32),
    ]

df = IndexStruct.from_struct_binary('your path')
print(df)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接