Python:读取12位二进制文件

11

我正在尝试使用Python 3读取包含图像(视频)的12位二进制文件。

为了读取一个类似的但是编码为16位的文件,可以采用以下方法:

import numpy as np
images = np.memmap(filename_video, dtype=np.uint16, mode='r', shape=(nb_frames, height, width))

其中filename_video是视频文件,nb_frames、height和width是可以从另一个文件中读取的视频特征。"工作得非常好"的意思是指快速:在我的计算机上,读取一个包含140帧的640x256视频只需要大约1毫秒。

据我所知,当文件编码为12位时,我无法使用此方法,因为没有uint12类型。所以我正在尝试读取一个12位文件并将其存储在16位uint数组中。以下代码来自于(Python:读取12位打包二进制图像),可以实现这个目的:

with open(filename_video, 'rb') as f:
    data=f.read()
images=np.zeros(int(2*len(data)/3),dtype=np.uint16)
ii=0
for jj in range(0,int(len(data))-2,3):
    a=bitstring.Bits(bytes=data[jj:jj+3],length=24)
    images[ii],images[ii+1] = a.unpack('uint:12,uint:12')
    ii=ii+2
images = np.reshape(images,(nb_frames,height,width))

然而,这非常缓慢:读取一个只有5帧的640x256视频文件需要大约11.5秒才能完成。理想情况下,我希望能够像使用memmap读取8位或16位文件一样有效地读取12位文件。或者至少不要慢到10^5倍。我该如何加快速度?

这里是一个文件示例: http://s000.tinyupload.com/index.php?file_id=26973488795334213426 (nb_frames=5,height=256,width=640)。


太酷了 (+1)。我只知道类似于OpenCV的cv.cvtColor(bayer,rgb,cv.COLOR_BayerBG2BGR) - George Profenza
4个回答

16

我有一个与@max9111提出的略有不同的实现,它不需要调用unpackbits

它通过将中间字节分成两半并使用numpy的二进制操作直接从三个连续的uint8创建两个uint12值。在下面的代码中,假定data_chunks是一个包含任意数量的12位整数信息的二进制字符串(因此其长度必须是3的倍数)。

def read_uint12(data_chunk):
    data = np.frombuffer(data_chunk, dtype=np.uint8)
    fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
    fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)
    snd_uint12 = ((mid_uint8 % 16) << 8) + lst_uint8
    return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])

我与其他实现进行了基准测试,这种方法在大约5 Mb的输入上证明比其他方法快了约4倍:
read_uint12_unpackbits每次循环需要65.5毫秒±1.11毫秒(7次运行,每次10个循环)
read_uint12每次循环需要14毫秒±513微秒(7次运行,每次100个循环)


8

加速numpy向量化方法的一种方式是避免为临时数据进行昂贵的内存分配,更有效地使用缓存并利用并行化。这可以通过使用 NumbaCythonC 轻松实现。请注意,并不总是有利于并行化。如果您要转换的数组太小,请使用单线程版本(parallel=False)。

使用临时内存分配的Cyril Gaudefroy Numba版本

import numba as nb
import numpy as np
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True)
def nb_read_uint12(data_chunk):
  """data_chunk is a contigous 1D array of uint8 data)
  eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
  
  #ensure that the data_chunk has the right length
  assert np.mod(data_chunk.shape[0],3)==0
  
  out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
  
  for i in nb.prange(data_chunk.shape[0]//3):
    fst_uint8=np.uint16(data_chunk[i*3])
    mid_uint8=np.uint16(data_chunk[i*3+1])
    lst_uint8=np.uint16(data_chunk[i*3+2])
    
    out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)
    out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8
    
  return out

使用Numba版本的Cyril Gaudefroy答案进行内存预分配

如果您将此函数多次应用于相似大小的数据块,则只需预先分配一次输出数组。

@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def nb_read_uint12_prealloc(data_chunk,out):
    """data_chunk is a contigous 1D array of uint8 data)
    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""

    #ensure that the data_chunk has the right length
    assert np.mod(data_chunk.shape[0],3)==0
    assert out.shape[0]==data_chunk.shape[0]//3*2

    for i in nb.prange(data_chunk.shape[0]//3):
        fst_uint8=np.uint16(data_chunk[i*3])
        mid_uint8=np.uint16(data_chunk[i*3+1])
        lst_uint8=np.uint16(data_chunk[i*3+2])

        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)
        out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8

    return out

Numba版本的DGrifffith答案,使用临时内存分配

@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2(data_chunk):
    """data_chunk is a contigous 1D array of uint8 data)
    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""

    #ensure that the data_chunk has the right length
    assert np.mod(data_chunk.shape[0],3)==0

    out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)

    for i in nb.prange(data_chunk.shape[0]//3):
        fst_uint8=np.uint16(data_chunk[i*3])
        mid_uint8=np.uint16(data_chunk[i*3+1])
        lst_uint8=np.uint16(data_chunk[i*3+2])

        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)
        out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)

    return out

Numba版本的DGrifffith答案,使用内存预分配

@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2_prealloc(data_chunk,out):
    """data_chunk is a contigous 1D array of uint8 data)
    eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""

    #ensure that the data_chunk has the right length
    assert np.mod(data_chunk.shape[0],3)==0
    assert out.shape[0]==data_chunk.shape[0]//3*2

    for i in nb.prange(data_chunk.shape[0]//3):
        fst_uint8=np.uint16(data_chunk[i*3])
        mid_uint8=np.uint16(data_chunk[i*3+1])
        lst_uint8=np.uint16(data_chunk[i*3+2])

        out[i*2] =   (fst_uint8 << 4) + (mid_uint8 >> 4)
        out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)

    return out

时间

num_Frames=10
data_chunk=np.random.randint(low=0,high=255,size=np.int(640*256*1.5*num_Frames),dtype=np.uint8)

%timeit read_uint12_gaud(data_chunk)
#11.3 ms ± 53.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#435 MB/s

%timeit nb_read_uint12(data_chunk)
#939 µs ± 24.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5235 MB/s

out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
%timeit nb_read_uint12_prealloc(data_chunk,out)
#407 µs ± 5.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#11759 MB/s

%timeit read_uint12_griff(data_chunk)
#10.2 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#491 MB/s

%timeit read_uint12_var_2(data_chunk)
#928 µs ± 16.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5297 MB/s
%timeit read_uint12_var_2_prealloc(data_chunk,out)
#403 µs ± 13.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#12227 MB/s

非常感谢您提供关于Numba的提示。我也遇到了转换所需计算时间类似的问题。我尝试使用bitArray和bitString包,但是循环速度太慢了。按照您最近建议的方式实现后,问题得到了解决!再次感谢,非常好的答案。 - BeCurious
嗨,@max9111,这是一个很棒的答案,你能否建议对下面的答案进行修改(它使用了不同的打包/位顺序)?这将非常有用! - Gittb
@Gittb 你指的是哪一个(用户名)?答案的顺序可能会有所不同。(我看到我的答案在最后一个)是的,但不是今天。在周末。 - max9111
由用户名@DGrifffith提供的解决方案。时间不紧,感谢您的帮助。如果您在处理完后能够通知我,那就太棒了。 - Gittb
谢谢你,Max。我非常感激! - Gittb

3

我发现@cyrilgaudefroy的答案很有用。但是,一开始它在我的12位打包二进制图像数据上并不能正常工作。后来发现在这种特殊情况下打包方式有些不同。"中间"字节包含了最不重要的半字节。三元组的第1和第3个字节是这12个字节中最重要的8位。因此我修改了@cyrilgaudefroy的回答:

def read_uint12(data_chunk):
    data = np.frombuffer(data_chunk, dtype=np.uint8)
    fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
    fst_uint12 = (fst_uint8 << 4) + (mid_uint8 >> 4)
    snd_uint12 = (lst_uint8 << 4) + (np.bitwise_and(15, mid_uint8))
    return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])

2
这是另一种变化。我的数据格式如下:
第一个uint12:来自第二个uint8的最高有效4位和第一个uint8的最低有效8位中的最高有效4位
第二个uint12:来自第三个uint8的最高有效8位和第二个uint8的最高有效4位中的最低有效4位
相应的代码如下:
"最初的回答"
def read_uint12(data_chunk):
    data = np.frombuffer(data_chunk, dtype=np.uint8)
    fst_uint8, mid_uint8, lst_uint8 = np.reshape(data, (data.shape[0] // 3, 3)).astype(np.uint16).T
    fst_uint12 = ((mid_uint8 & 0x0F) << 8) | fst_uint8
    snd_uint12 = (lst_uint8 << 4) | ((mid_uint8 & 0xF0) >> 4)
    return np.reshape(np.concatenate((fst_uint12[:, None], snd_uint12[:, None]), axis=1), 2 * fst_uint12.shape[0])

这个变量对我也起作用了。它与Matlab的ubit12表示相匹配。 - erikreed

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接