使用numpy/ctypes实现环形缓冲区

7
我正在开发一个客户端,它将通过tcp接收[EEG]数据并将其写入环形缓冲区。我认为将缓冲区作为ctypes或numpy数组可能非常方便,因为可以创建numpy“视图”到这种缓冲区的任何位置,并读取/写入/处理数据而无需进行任何复制操作。但是,总体上这是否是一个不好的想法?
然而,我不知道如何以这种方式实现固定大小的循环缓冲区。假设我已经创建了一个在内存中连续的缓冲区对象。当写指针到达缓冲区数组的末尾时,最佳的写入数据方式是什么?
一种可能的方法是当写指针到达缓冲区数组的末尾时,从开始处开始覆盖(已经旧的)字节。然而,在边界附近,由于某些块的numpy视图(用于处理)仍然可以位于缓冲区数组的末尾,而另一个块已经位于其开始处,因此在这种情况下可能无法创建。我已经阅读过无法创建这样的循环切片。如何解决这个问题?
更新:感谢大家的答案。如果有人也面临同样的问题,则这里是我得到的最终代码。

1
我编写了 numpy_ringbuffer 来解决这个问题,它提供了对底层缓冲区的 deque 类似接口。 - Eric
4个回答

7

如果您需要一个大小为N字节的窗口,请将缓冲区设置为2*N字节,并将所有输入写入两个位置:i % Ni % N + N,其中i是字节计数器。这样您就可以始终在缓冲区中拥有N个连续字节。

data = 'Data to buffer'
N = 4
buf = 2*N*['\00']

for i,c in enumerate(data):
    j = i % N
    buf[j] = c
    buf[j+N] = c
    if i >= N-1:
        print ''.join(buf[j+1:j+N+1]) 

打印
Data
ata 
ta t
a to
 to 
to b
o bu
 buf
buff
uffe
ffer

是的。这是我现在正在尝试编写的内容。我不再使用2*N缓冲区,而是使用一个任意长度为N的缓冲区,并遵循相同的思路。无论如何,谢谢! - dmytro
如果性能完全不是问题,那么这样做很好;但考虑到您的应用程序,我对此表示怀疑。您最好使用矢量化解决方案来解决问题。 - Eelco Hoogendoorn

3

我认为在这里你需要远离C式思维。每次插入都更新环形缓冲区永远不会是高效的。环形缓冲区与numpy数组要求的连续内存块接口根本不同,包括你提到的fft。

一个自然的解决方案是为了性能而牺牲一点内存。例如,如果您需要保存的元素数量为N,则分配一个大小为N + 1024(或某些合理数字)的数组。然后,您只需要在每1024个插入中移动N个元素,并且始终有一个包含N个元素的连续视图可供直接使用。

编辑:以下是实现上述内容并应该具有良好性能的代码片段。请注意,最好以块为单位添加,而不是逐个元素添加。否则,无论如何实现您的环形缓冲区,使用numpy的性能优势很快就会被抵消。

import numpy as np

class RingBuffer(object):
    def __init__(self, size, padding=None):
        self.size = size
        self.padding = size if padding is None else padding
        self.buffer = np.zeros(self.size+self.padding)
        self.counter = 0

    def append(self, data):
        """this is an O(n) operation"""
        data = data[-self.padding:]
        n = len(data)
        if self.remaining < n: self.compact()
        self.buffer[self.counter+self.size:][:n] = data
        self.counter += n

    @property
    def remaining(self):
        return self.padding-self.counter
    @property
    def view(self):
        """this is always an O(1) operation"""
        return self.buffer[self.counter:][:self.size]
    def compact(self):
        """
        note: only when this function is called, is an O(size) performance hit incurred,
        and this cost is amortized over the whole padding space
        """
        print 'compacting'
        self.buffer[:self.size] = self.view
        self.counter = 0

rb = RingBuffer(10)
for i in range(4):
    rb.append([1,2,3])
    print rb.view

rb.append(np.arange(15))
print rb.view  #test overflow

感谢Eelco。所以,一个视图数组根本不起作用吗?但是这如何检测,并且整个Aptr被一个平面副本替换了? Aptr [0] .flags 具有OWNDATA False,令人困惑。 - denis
我不确定您所说的“视图数组”是什么意思;视图可以很容易地即时构建,因此您实际上不需要将它们保存在数组中。问题的关键是,如果您希望在任何时候将数据用作numpy数组,则需要在内存中连续地存储它。要么每次在追加后需要连续访问环形缓冲区时都会产生O(N)性能损失,要么您需要分配一些额外的内存,以便您可以延迟和分摊此类操作,以保持内存连续。 - Eelco Hoogendoorn

2
一种可能的方法是,当写指针到达缓冲区数组的末尾时,从开头开始覆盖(已经过时)的字节。
这是固定大小环形缓冲区中的唯一选择。
我读过不可能创建这样的循环切片。
这就是为什么我不会使用Numpy视图来执行此操作。相反,您可以创建一个围绕ndarray的类包装器,其中包含缓冲区/数组、容量和指向插入点的指针(索引)。如果您想将内容作为Numpy数组获取,则必须像这样进行复制:
buf = np.array([1,2,3,4])
indices = [3,0,1,2]
contents = buf[indices]    # copy

如果您实现了__setitem____setslice__,仍然可以在原地设置元素的值。


感谢您。但是,如果这样的块必须被复制,那么使用collections.deque作为缓冲区,然后执行numpy.array(list(itertools.islice(buf,chstart,chend)))会更好吗?或者它是慢得多? - dmytro
我想避免复制,因为在那个数据上进行滑动窗口FFT将意味着每次出现新数据点时复制几乎相同的数据块。 - dmytro
@dmytro:你需要测量一下deque是否更快。如果你想把环形缓冲区存储的数据放入数组中,恐怕很难避免复制。 - Fred Foo

-2
@Janne Karila的答案的变体,适用于C语言但不适用于numpy:
如果环形缓冲区非常宽,例如N x 1G,则不要将整个缓冲区加倍, 而是将指向其行的2*N个指针的数组加倍。 例如,对于N=3,进行初始化
bufp = { buf[0], buf[1], buf[2], buf[0], buf[1], buf[2] };

然后你只需写入数据一次,anyfunc( bufp[j:j+3] ) 就会按时间顺序查看 buf 中的行。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接