Python中针对任意大小字节块的高效FIFO队列

22

我该如何实现一个FIFO缓冲区,以便我可以高效地将任意大小的字节块添加到头部,并从中高效地弹出任意大小的字节块?

背景:

我有一个类,它按任意大小的块从文件对象读取字节,并且本身就是一个文件对象,客户端可以按任意大小的块从中读取字节。

我的实现方式是,每当客户端想要读取一块字节时,该类将重复从底层文件对象中读取(使用适合这些对象的块大小)并将字节添加到FIFO队列的头部,直到队列中有足够的字节为客户端提供所请求大小的块。然后,它从队列的尾部弹出这些字节并将它们返回给客户端。

当底层文件对象的块大小远大于客户端从该类中读取时使用的块大小时,我遇到了性能问题。

假设底层文件对象的块大小为1 MiB,客户端使用的块大小为1 KiB。第一次客户端请求1 KiB时,该类必须读取1 MiB并将其添加到FIFO队列中。然后,对于该请求和随后的1023个请求,该类必须从FIFO队列的尾部弹出1 KiB,队列逐渐从1 MiB减小到0字节,然后再次开始循环。

我目前使用StringIO对象实现了这一点。向StringIO对象的末尾写入新字节非常快,但是从开头删除字节非常慢,因为必须创建一个新的StringIO对象,该对象保存整个先前缓冲区的副本,减去第一块字节。

涉及类似问题的 SO 问题通常会指向 deque 容器。但是,deque 实现为双向链表。将块写入 deque 将需要将块分成包含单个字节的对象。然后,deque 将向每个对象添加两个指针进行存储,这可能会将内存需求增加至少一个数量级而不是字节。此外,遍历链表并处理每个对象都需要很长时间,既要将块分割成对象,也要将对象组合成块。

4个回答

16
我目前使用StringIO对象实现了这一点。向StringIO对象的末尾写入新字节很快,但从开头删除字节非常慢,因为必须创建一个新的StringIO对象,该对象保存除第一个块字节之外的整个先前缓冲区的副本。

实际上,实现FIFO的最典型方法是使用具有两个指针的环形缓冲区:

enter image description here image source

现在,您可以使用StringIO()并使用.seek()从适当位置读取/写入来实现它。


1
哦,环绕加1分。我没有想到过这一点。不过你需要提前知道最大尺寸;实际上,我想它可以根据需要增长... - Cameron
1
谢谢!这看起来很完美。我用StringIO做了一个实验,表明它会自动扩展以适应这个问题。例如,如果StringIO对象的当前大小为10字节,并且PUTPT(寻址位置)在索引5处,写入20字节块会自动将StringIO对象扩展到25字节,保留前5字节并覆盖其余部分。但是,如果GETPT当前在PUTPT之后,则需要更多的逻辑。 - Roger Dahl
1
我已经在下面的答案中实现了这个想法。干杯! - Cameron

12

更新:以下是来自vartec回答中的循环缓冲区技术实现(建立在我的原始答案基础上,对于那些好奇的人,下面仍然保留):

from cStringIO import StringIO

class FifoFileBuffer(object):
    def __init__(self):
        self.buf = StringIO()
        self.available = 0    # Bytes available for reading
        self.size = 0
        self.write_fp = 0

    def read(self, size = None):
        """Reads size bytes from buffer"""
        if size is None or size > self.available:
            size = self.available
        size = max(size, 0)

        result = self.buf.read(size)
        self.available -= size

        if len(result) < size:
            self.buf.seek(0)
            result += self.buf.read(size - len(result))

        return result


    def write(self, data):
        """Appends data to buffer"""
        if self.size < self.available + len(data):
            # Expand buffer
            new_buf = StringIO()
            new_buf.write(self.read())
            self.write_fp = self.available = new_buf.tell()
            read_fp = 0
            while self.size <= self.available + len(data):
                self.size = max(self.size, 1024) * 2
            new_buf.write('0' * (self.size - self.write_fp))
            self.buf = new_buf
        else:
            read_fp = self.buf.tell()

        self.buf.seek(self.write_fp)
        written = self.size - self.write_fp
        self.buf.write(data[:written])
        self.write_fp += len(data)
        self.available += len(data)
        if written < len(data):
            self.write_fp -= self.size
            self.buf.seek(0)
            self.buf.write(data[written:])
        self.buf.seek(read_fp)

原始答案(已被上面的答案取代):

你可以使用缓冲区并跟踪起始索引(读取文件指针),在它变得太大时偶尔压缩它(这应该会产生相当好的平均性能)。

例如,可以像这样包装StringIO对象:

from cStringIO import StringIO
class FifoBuffer(object):
    def __init__(self):
        self.buf = StringIO()

    def read(self, *args, **kwargs):
        """Reads data from buffer"""
        self.buf.read(*args, **kwargs)

    def write(self, *args, **kwargs):
        """Appends data to buffer"""
        current_read_fp = self.buf.tell()
        if current_read_fp > 10 * 1024 * 1024:
            # Buffer is holding 10MB of used data, time to compact
            new_buf = StringIO()
            new_buf.write(self.buf.read())
            self.buf = new_buf
            current_read_fp = 0

        self.buf.seek(0, 2)    # Seek to end
        self.buf.write(*args, **kwargs)

        self.buf.seek(current_read_fp)

3
太棒了!感谢您的完整实现。 - Roger Dahl
1
@Roger:没问题。我想这可能会在某一天派上用场;-) - Cameron
只是出于好奇,它会更快吗? - Zamfir Kerlukson
@Zamfir:不知道 :-) 试一下看看? - Cameron

12
...但是从开头删除字节非常慢,因为必须创建一个新的StringIO对象,该对象保存了除第一块字节之外的整个先前缓冲区的副本。
这种缓慢可以通过在Python> = v3.4中使用bytearray来克服。请参见此问题中的讨论,补丁在此处
关键是:通过以下方式从bytearray中删除头字节。
a[:1] = b''   # O(1) (amortized)

比...要快得多
a = a[1:]     # O(len(a))

len(a) 非常大时(比如 10**6),bytearray 提供了一种方便的方式将整个数据集预览为数组(即它本身),与需要将对象连接成块的 deque 容器形成对比。
现在可以实现一个高效的 FIFO,具体方法如下:
class byteFIFO:
    """ byte FIFO buffer """
    def __init__(self):
        self._buf = bytearray()

    def put(self, data):
        self._buf.extend(data)

    def get(self, size):
        data = self._buf[:size]
        # The fast delete syntax
        self._buf[:size] = b''
        return data

    def peek(self, size):
        return self._buf[:size]

    def getvalue(self):
        # peek with no copy
        return self._buf

    def __len__(self):
        return len(self._buf)

基准测试

import time
bfifo = byteFIFO()
bfifo.put(b'a'*1000000)        # a very long array
t0 = time.time()
for k in range(1000000):
    d = bfifo.get(4)           # "pop" from head
    bfifo.put(d)               # "push" in tail
print('t = ', time.time()-t0)  # t = 0.897 on my machine

卡梅隆的答案中循环/环形缓冲区实现需要2.378秒,而他/她的原始实现只需要1.108秒。


3
可以使用del语句,例如del self._buf[:size]来使用快速删除语法。 - Hubert Kario

4

你能假设关于预期的读/写量吗?

将数据分块,例如分成1024字节的片段,并使用deque[1]可能会更好;您可以只读取N个完整的块,然后读取最后一个块以拆分并将其余部分放回队列的开头。

1) collections.deque

class collections.deque([iterable[, maxlen]])

返回使用iterable中的数据从左到右初始化(使用append())的新deque对象。如果未指定iterable,则新deque为空。

deque是堆栈和队列的一般化(名称发音为“deck”,缩写为“double-ended queue”)。 deque支持线程安全,内存高效的从deque的任一侧附加和弹出,两个方向的O(1)性能大致相同。…


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接