在Python中从二进制文件中提取特定字节

4
我有非常大的二进制文件,其中包含y个传感器的x个int16数据点,以及一些基本信息的头文件。二进制文件是每个采样时间的y个值一组,最多有x个采样,然后是另一组读数,以此类推。如果我想要所有的数据,我使用numpy.fromfile(),这个方法非常快且很好用。然而,如果我只想要一部分传感器数据或特定的传感器数据,目前我采用了一个可怕的双重for循环,使用file.seek()file.read()struct.unpack(),需要很长时间。在Python中是否有更快的方式来实现这个功能呢?也许可以使用mmap(),但我不太理解它的用法?或者只使用fromfile(),然后进行子采样?
data = numpy.empty(num_pts, sensor_indices)
for i in range(num_pts):
    for j in range(sensor_indices):
        curr_file.seek(bin_offsets[j])
        data_binary = curr_file.read(2)
        data[j][i] = struct.unpack('h', data_binary)[0]

在按照@rrauenza的建议使用后,我对代码进行了编辑,如下:

mm = mmap.mmap(curr_file.fileno(), 0, access=mmap.ACCESS_READ)
data = numpy.empty(num_pts,sensor_indices)
for i in range(num_pts):
    for j in range(len(sensor_indices)):
        offset += bin_offsets[j] * 2
        data[j][i] = struct.unpack('h', mm[offset:offset+2])[0]

虽然这比以前更快了,但仍然比

订购级别慢得多

shape = (x, y)
data = np.fromfile(file=self.curr_file, dtype=np.int16).reshape(shape)
data = data.transpose()
data = data[sensor_indices, :]
data = data[:, range(num_pts)]

我用一个小一点的文件进行了测试,文件大小只有30 Mb,仅包含16个传感器的30秒数据。原始代码需要160秒,使用mmap需要105秒,而使用numpy.fromfile()和子采样仅需0.33秒。

剩下的问题是 - 显然,对于小文件来说使用numpy.fromfile()更好,但是对于更大的文件(可能高达20 Gb,包含数小时或数天的数据和多达500个传感器),是否会出现问题还需要进一步验证。


2
你有研究过pandas吗?它非常适合以几乎任何想要的方式对大型数据集进行排序。 - Chris
你好,欢迎来到StackOverflow!你的标题有点令人困惑——strip通常意味着删除。更准确的词可能是extract。 - rrauenza
你的非常大的文件有多大? - rrauenza
文件大小可达20 GB。在@rrauenza提供的非常有用的建议后,我尝试了三个不同的代码块,并使用新信息编辑了原始帖子。 - launchpadmcquack
1
我认为这取决于你拥有的RAM数量。这是一个可扩展性问题。随着数据的增长,mmap()应该会线性扩展,而numpy.fromfile()则不会,因为在某个点上你将需要使用分页技术。 - rrauenza
1
你可以使用 numpy.memmap,同时兼顾两者的优点。在进行高级索引之前进行切片 (0:num_pts),以最小化复制。 - user2379410
1个回答

6

我一定会尝试使用mmap()

https://docs.python.org/2/library/mmap.html

如果你在每次提取int16时都调用seek()read(),那么你将会读取很多小块并且有很多系统调用开销

我写了一个小测试来说明:

#!/usr/bin/python

import mmap
import os
import struct
import sys

FILE = "/opt/tmp/random"  # dd if=/dev/random of=/tmp/random bs=1024k count=1024
SIZE = os.stat(FILE).st_size
BYTES = 2
SKIP = 10


def byfile():
    sum = 0
    with open(FILE, "r") as fd:
        for offset in range(0, SIZE/BYTES, SKIP*BYTES):
            fd.seek(offset)
            data = fd.read(BYTES)
            sum += struct.unpack('h', data)[0]
    return sum


def bymmap():
    sum = 0
    with open(FILE, "r") as fd:
        mm = mmap.mmap(fd.fileno(), 0, prot=mmap.PROT_READ)
        for offset in range(0, SIZE/BYTES, SKIP*BYTES):
            data = mm[offset:offset+BYTES]
            sum += struct.unpack('h', data)[0]
    return sum


if sys.argv[1] == 'mmap':
    print bymmap()

if sys.argv[1] == 'file':
    print byfile()

我每个方法都运行了两次以补偿缓存。我使用time,因为我想要测量usersys时间。

以下是结果:

[centos7:/tmp]$ time ./test.py file
-211990391

real    0m44.656s
user    0m35.978s
sys     0m8.697s
[centos7:/tmp]$ time ./test.py file
-211990391

real    0m43.091s
user    0m37.571s
sys     0m5.539s
[centos7:/tmp]$ time ./test.py mmap
-211990391

real    0m16.712s
user    0m15.495s
sys     0m1.227s
[centos7:/tmp]$ time ./test.py mmap
-211990391

real    0m16.942s
user    0m15.846s
sys     0m1.104s
[centos7:/tmp]$ 

这个总和为-211990391只是验证两个版本做了同样的事情。

看每个版本的第二个结果,mmap()大约是实际时间的1/3。用户时间大约是实际时间的1/2,系统时间大约是实际时间的1/5。

也许加速此过程的其他选项是:

(1) 正如您所提到的,加载整个文件。 大I/O而不是小I/O 可能会加速事情。 但是,如果超出系统内存,您将退回到分页,这将比mmap()更糟糕(因为您必须分页)。 我在这里并不是非常有希望,因为mmap已经使用较大的I/O。

(2) 并发性。也许通过多个线程并行读取文件可以加快速度,但你需要处理Python的GIL多进程将通过避免GIL而更好地工作,并且您可以轻松地将数据传回顶级处理程序。然而,这将违反下一个项目,即局部性:您可能会使I/O更随机。

(3) 局部性。以某种方式组织您的数据(或按顺序读取),使您的数据更加接近。mmap()根据系统页面大小将文件分页为块:

>>> import mmap
>>> mmap.PAGESIZE
4096
>>> mmap.ALLOCATIONGRANULARITY
4096
>>> 

如果您的数据更接近(在4k块内),它将已经被加载到缓冲区高速缓存中。
(4)更好的硬件。像SSD一样。
我确实在SSD上运行了这个程序,速度快多了。我对Python进行了分析,想知道解包是否昂贵。结果不是:
$ python -m cProfile test.py mmap                                                                                                                        
121679286
         26843553 function calls in 8.369 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    6.204    6.204    8.357    8.357 test.py:24(bymmap)
        1    0.012    0.012    8.369    8.369 test.py:3(<module>)
 26843546    1.700    0.000    1.700    0.000 {_struct.unpack}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
        1    0.000    0.000    0.000    0.000 {method 'fileno' of 'file' objects}
        1    0.000    0.000    0.000    0.000 {open}
        1    0.000    0.000    0.000    0.000 {posix.stat}
        1    0.453    0.453    0.453    0.453 {range}

附加说明:

好奇心驱使我尝试了multiprocessing。我需要仔细查看我的分区,但解包的数量(53687092)在所有试验中都是相同的:

$ time ./test2.py 4
[(4415068.0, 13421773), (-145566705.0, 13421773), (14296671.0, 13421773), (109804332.0, 13421773)]
(-17050634.0, 53687092)

real    0m5.629s
user    0m17.756s
sys     0m0.066s
$ time ./test2.py 1
[(264140374.0, 53687092)]
(264140374.0, 53687092)

real    0m13.246s
user    0m13.175s
sys     0m0.060s

代码:

#!/usr/bin/python

import functools
import multiprocessing
import mmap
import os
import struct
import sys

FILE = "/tmp/random"  # dd if=/dev/random of=/tmp/random bs=1024k count=1024
SIZE = os.stat(FILE).st_size
BYTES = 2
SKIP = 10


def bymmap(poolsize, n):
    partition = SIZE/poolsize
    initial = n * partition
    end = initial + partition
    sum = 0.0
    unpacks = 0
    with open(FILE, "r") as fd:
        mm = mmap.mmap(fd.fileno(), 0, prot=mmap.PROT_READ)
        for offset in xrange(initial, end, SKIP*BYTES):
            data = mm[offset:offset+BYTES]
            sum += struct.unpack('h', data)[0]
            unpacks += 1
    return (sum, unpacks)


poolsize = int(sys.argv[1])
pool = multiprocessing.Pool(poolsize)
results = pool.map(functools.partial(bymmap, poolsize), range(0, poolsize))
print results
print reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]), results)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接