高效的循环缓冲区?

153

我希望在Python中创建一个高效的循环缓冲区(目标是对缓冲区中的整数值取平均值)。

使用列表收集值是否是高效的方法?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

什么方法更有效(为什么)?


4
这不是实现循环缓冲区的有效方法,因为在列表中pop(0)操作的时间复杂度为O(n)。pop(0)会移除列表中的第一个元素,并且所有元素都必须向左移动。相反,应该使用带有maxlen属性的collections.deque。deque具有O(1)的append和pop操作。 - Vlad Bezden
15个回答

289

我会使用具有 maxlen 参数的 collections.deque

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

在文档中有一个关于deque示例,与您的要求类似。我认为它是最有效的主要原因是它是由一支非常熟练的团队用C语言实现的,他们总是能够写出一流的代码。


8
+1 是一种方便易用的方式,它包含了好的电池。循环缓冲区的操作是 O(1),正如你所说的,额外开销在 C 语言中,所以速度应该仍然相当快。 - John La Rooy
7
我不喜欢这个解决方案,因为文档没有保证在定义了 maxlen 后可以 O(1) 随机访问。如果 deque 可以无限增长,则 O(n) 是可以理解的,但是如果给定了 maxlen,索引元素应该是常量时间。 - lvella
1
我的猜测是它被实现为链表而不是数组。 - Bite code
1
如果我下面的回答中的时间正确,那么似乎是正确的。 - djvg
4
实际的Python实现(请参见此处:https://github.com/python/cpython/blob/main/Modules/_collectionsmodule.c)使用了两种思想的混合:固定大小块的链表。也就是说,它将deque分割成内部等大小的内存块。这将减轻纯链表实现的一些成本,但长deque内部项的访问时间仍然为`O(maxlen)`(又称`O(n)`)。 - blubberdiblub
显示剩余2条评论

34
尽管这里已经有很多很棒的答案,但我没有找到任何关于所提到的选项的时间比较。因此,请看下面我谦虚的比较尝试, 仅供测试目的,该类可以在基于listcollections.dequeNumpy.roll的缓冲区之间切换。请注意,update 方法每次只添加一个值,以保持简单。
import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)
在我的系统上,这会产生以下结果:
list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)

15

从列表的头部进行弹出操作会导致整个列表被复制,因此效率较低。

相反,您应该使用固定大小的列表/数组和一个索引,在添加/删除项目时移动缓冲区。


4
同意。无论看起来多么优雅或不优雅,也不管使用何种语言。事实上,你越少干扰垃圾收集器(或堆管理器或页面/映射机制或任何实际执行内存操作的东西),就会越好。 - user215054
@RocketSurgeon 这不是魔法,只是一个删除了第一个元素的数组。因此,对于大小为n的数组,这意味着n-1个复制操作。这里没有垃圾收集器或类似设备参与。 - Christian
3
我同意。这样做比一些人想象的要容易得多。只需要使用递增计数器,并在访问项目时使用模运算符(% arraylen)。 - Andre Blum
同上,您可以查看我的帖子,那就是我是如何做到的。 - MoonCactus

14

根据MoonCactus的回答,这里有一个circularlist类。与他的版本不同的是,在这里c[0]将始终给出最早添加的元素,c[-1]将给出最新添加的元素,c[-2]将给出倒数第二个...这对应用程序更自然。

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]

类:

class circularlist(object):
    def __init__(self, size, data = []):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)

不错的补充。Python列表已经允许负索引,但是当循环缓冲区已满时,(-1)并不会返回预期的值,因为列表的“最后”添加实际上在列表内部。 - MoonCactus
1
它确实有效@MoonCactus,请参见我在答案顶部给出的6个示例;在最后几个示例中,您可以看到c[-1]始终是正确的元素。 __getitem__做得很好。 - Basj
1
哦,是的,我的意思是我的失败了,不是你的,抱歉:D我会让我的评论更清晰!--哦,我不能,这个评论太旧了。 - MoonCactus
不错的简单解决方案。我添加了一个可选参数,以允许从现有数据初始化列表,这样更符合Python的风格。 - Orwellophile
除了计算处理的项目数量之外,是否有一种方法可以仅迭代一次此循环列表的成员?“for i in c:”会无限迭代...谢谢- Mike T. - Mike T.
这段代码似乎有两个问题。首先,当列表还没有满时,索引不会循环。如果我执行 c=circularlist(5, [1,2]),我不能执行 c[2],但是一旦列表已经满了,就可以执行 c[5]。其次,c=circularlist(5); c.append(1) 应该产生与 c=circularlist(5, [1]) 相同的列表,但实际上并不是这样。在前一种情况下,c.index1,而在后一种情况下,它是 0 - Joost Kremers

8

虽然deque类的使用没问题,但考虑到问题的要求(平均值),这是我的解决方案:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14

当我尝试调用“average”方法时,出现“TypeError:'numpy.float64'对象不可调用”的错误。 - scls
是的...实际上我猜测deque在内部使用了numpy数组(在移除@property后,它工作得很好)。 - scls
21
我保证deque在内部不使用numpy数组。collections是标准库的一部分,而numpy则不是。依赖第三方库将使标准库变得糟糕。 - user719958

7

2
但是 numpy.roll 返回的是数组的副本,对吗? - djvg
12
这个回答很误导人,Python的deque看起来非常快,但在你链接的基准测试中,从numpy数组转换到deque以及从deque转换到numpy数组会显著降低它的速度。 - xitrium
可以看到,在 numpy.roll 的源代码中,它计算了旧数组的前半部分和后半部分的切片,创建了一个新数组(使用 empty_like),然后将交换后的两个部分复制到新数组中。由于每次都要复制整个数组,这是 O(n) 的时间复杂度。 - Bas Swinckels

4

那么Python Cookbook中的解决方案如何?当环形缓冲实例变满时,可以重新分类。

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        self.data.append(x)
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x=RingBuffer(5)
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    x.append(5)
    print(x.__class__, x.get())
    x.append(6)
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

在这个实现中,值得注意的设计选择是,由于这些对象在它们的生命周期中某个时刻会经历一个不可逆转的状态转换——从非满缓冲区到满缓冲区(并且此时行为发生变化)——我通过改变self.__class__来模拟它。只要两个类具有相同的slots(例如,在这个示例中,对于两个经典类RingBuffer和__Full而言,它可以正常工作),即使在Python 2.2中也可以运行。
在许多语言中,改变实例的类可能很奇怪,但它是一种Pythonic的替代方式,用于表示偶尔、大规模、不可逆和离散的状态变化,这些变化会极大地影响行为,就像在这个示例中一样。幸好,Python支持所有类型的类进行此操作。

出处:Sébastien Keim


2
我对这个和deque进行了一些速度测试。这比deque慢大约7倍。 - PolyMesh
@PolyMesh 很棒,你应该让作者知道! - d8aninja
3
那样做有什么意义呢?这是一份已经发表的老文件。我的评论目的是让其他人知道这个答案已经过时,应该使用deque代替。 - PolyMesh
@PolyMesh 在他发表文章时,它可能仍然比较慢;与作者联系的说明在书的介绍中。我只是提供一种可能的替代方案。此外,“如果速度只是最佳指标就好了;遗憾的是它可能只是一个好指标。” - d8aninja
@d8aninja deque也有一个.clear()函数。但这个没有(从我所知道的来看,也不能)。 - Connor
@Connor 这是一本烹饪书中的教学示例,旨在简单易懂,适合初学者入门(因此称为“烹饪书”)。Deque非常棒,我很高兴你喜欢它。 - d8aninja

3
你也可以查看这个相当古老的Python示例
这是我自己使用NumPy数组的版本:
#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value

4
使用NumPy很好,但是没有实现循环缓冲区需要扣1分。你当前的实现方式每次添加一个元素时都要移动所有数据,这样的时间复杂度是O(n)。为实现一个正确的circular buffer,你需要有一个索引和一个大小变量,并且要正确处理数据“绕过”缓冲区末尾的情况。在检索数据时,您可能需要连接缓冲区开头和结尾的两个部分。 - Bas Swinckels

3

我以前在串行编程时遇到过这个问题。当时大约一年前,我也找不到任何高效的实现方法,所以我最终写了一个C扩展pyringbuf,并且它也可以在MIT许可下在pypi上使用。它非常基础,只处理8位有符号字符的缓冲区,但长度是灵活的,因此如果您需要除字符外的其他内容,可以在其上使用Struct或其他内容。然而,现在通过谷歌搜索发现有几种选择,因此您可能还要查看那些选项。


3

来自Github:

class CircularBuffer:

    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size

    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count

    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
        else:
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size

    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value

    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'

https://github.com/heineman/python-data-structures/blob/master/2.%20Ubiquitous%20Lists/circBuffer.py


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接