使用Golang实现具有并发读取功能的缓冲区

4

我希望在Go语言中构建一个缓冲区,支持多个并发读取器和一个写入器。任何写入到缓冲区的内容都应该被所有读取器读取。新的读取器可以随时加入,这意味着已经写入的数据必须能够为迟到的读取器回放。

缓冲区应满足以下接口:

type MyBuffer interface {
    Write(p []byte) (n int, err error)
    NextReader() io.Reader
}

你有没有建议,最好使用内置类型来实现这样的功能?


1
https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying https://kafka.apache.org/intro - dm03514
2
标准库中没有现成的解决方案。不过,你可以使用围绕通道构建的自定义结构,其中每个读者将其读取内容回显到通道,以便其他读者可以读取它。问题在于如何确定限制。您希望能够回放旧数据以供晚到的读者查看,但这意味着要永久保留所有该数据(即整个程序生命周期内),因为您永远不知道什么时候会有新的读者加入。这会导致大量内存泄漏的风险。 - Kaedys
如果保留和重放旧数据不是很重要,那么这个链接提供了单播放器多接收器系统的可靠实现:https://rogpeppe.wordpress.com/2009/12/01/concurrent-idioms-1-broadcasting-values-in-go-with-linked-channels/ - Kaedys
https://github.com/djherbis/bufit - wim
3个回答

7
根据该写入程序的性质以及使用方式,将所有内容保存在内存中(用于为稍后加入的读者重新播放)非常危险,可能会消耗大量内存或导致应用程序由于内存不足而崩溃。仅将所有内容保存在内存中以用作“低流量”记录器可能是可以的,但例如流式传输某些音频或视频则极不可取。
如果下面的读取实现读取了写入到缓冲区的所有数据,则它们的`Read()`方法将正确报告`io.EOF`。必须注意一些结构(例如bufio.Scanner)在遇到`io.EOF`后可能不会再读取更多数据(但这不是我们实现的缺陷)。
如果您希望我们缓冲区的读者在缓冲区中没有更多数据时等待,直到写入新数据而不是返回`io.EOF`,您可以在此处提供的“tail reader”的返回读者中包装: Go: “tail -f”-like generator
"内存安全"文件实现
这里有一个极其简单而优雅的解决方案。它使用文件进行写入,并使用文件进行读取。同步基本由操作系统提供。由于数据仅存储在磁盘上,因此不会有内存不足错误的风险。根据您的写入程序的性质,这可能足够或不足够。
我更喜欢使用以下接口,因为在文件的情况下`Close()`很重要。
type MyBuf interface {
    io.WriteCloser
    NewReader() (io.ReadCloser, error)
}

实现非常简单:
type mybuf struct {
    *os.File
}

func (mb *mybuf) NewReader() (io.ReadCloser, error) {
    f, err := os.Open(mb.Name())
    if err != nil {
        return nil, err
    }
    return f, nil
}

func NewMyBuf(name string) (MyBuf, error) {
    f, err := os.Create(name)
    if err != nil {
        return nil, err
    }
    return &mybuf{File: f}, nil
}

我们的mybuf类型嵌入了*os.File,因此我们免费获取了Write()Close()方法。 NewReader()仅仅是以只读模式打开现有的后备文件进行读取,并返回它,再次利用它实现io.ReadCloser的特性。
创建一个新的MyBuf值是通过NewMyBuf()函数实现的,如果创建文件失败,也可以返回error
注意,由于mybuf嵌入了*os.File,所以可以使用类型断言来"访问"其他os.File的公开方法,即使它们不是MyBuf接口的一部分。我认为这不是一个缺陷,但如果您想禁止这样做,您必须更改mybuf的实现方式,不再嵌入os.File,而是将其作为命名字段(但然后您必须自己添加Write()Close()方法,正确地转发到os.File字段)。

内存实现

如果文件实现不够用,这里提供了一种内存实现。
由于我们现在只在内存中操作,所以我们将使用以下接口:
type MyBuf interface {
    io.Writer
    NewReader() io.Reader
}

这个想法是存储所有传递给缓冲区的字节片。当调用Read()时,读取器将提供已存储的片段,每个读取器将跟踪其Read()方法提供了多少已存储的片段。必须处理同步,我们将使用一个简单的sync.RWMutex
没有进一步的话,这里是实现:
type mybuf struct {
    data [][]byte
    sync.RWMutex
}

func (mb *mybuf) Write(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    // Cannot retain p, so we must copy it:
    p2 := make([]byte, len(p))
    copy(p2, p)
    mb.Lock()
    mb.data = append(mb.data, p2)
    mb.Unlock()
    return len(p), nil
}

type mybufReader struct {
    mb   *mybuf // buffer we read from
    i    int    // next slice index
    data []byte // current data slice to serve
}

func (mbr *mybufReader) Read(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    // Do we have data to send?
    if len(mbr.data) == 0 {
        mb := mbr.mb
        mb.RLock()
        if mbr.i < len(mb.data) {
            mbr.data = mb.data[mbr.i]
            mbr.i++
        }
        mb.RUnlock()
    }
    if len(mbr.data) == 0 {
        return 0, io.EOF
    }

    n = copy(p, mbr.data)
    mbr.data = mbr.data[n:]
    return n, nil
}

func (mb *mybuf) NewReader() io.Reader {
    return &mybufReader{mb: mb}
}

func NewMyBuf() MyBuf {
    return &mybuf{}
}

请注意,Writer.Write() 的一般契约包括实现不能保留传递的切片,因此在“存储”之前,我们必须对其进行复制。
另请注意,读取器的Read()尝试最小化锁定时间。也就是说,如果我们需要从缓冲区获取新的数据切片,它才会锁定,并且仅执行读取锁定。这意味着,如果读取器有一个部分数据切片,则会在不锁定和触摸缓冲区的情况下将其发送到Read()中。

1
非常感谢。优雅的解决方案。您提出的 io.WriteCloser 建议是个好主意。理想情况下,读取器的 Read() 方法会等待更多数据,直到写入器被关闭,此时它们将收到 io.EOF。关于内存消耗,可以在内存中启动缓冲区,并在超过一定大小时将数据转储到磁盘上。我会采用您的建议来逐步解决这个问题。再次感谢您。 - Tympanix

1

作为实验的一部分,我必须做类似的事情,所以分享:

type MultiReaderBuffer struct {
    mu  sync.RWMutex
    buf []byte
}

func (b *MultiReaderBuffer) Write(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    b.mu.Lock()
    b.buf = append(b.buf, p...)
    b.mu.Unlock()
    return len(p), nil
}

func (b *MultiReaderBuffer) NewReader() io.Reader {
    return &mrbReader{mrb: b}
}

type mrbReader struct {
    mrb *MultiReaderBuffer
    off int
}

func (r *mrbReader) Read(p []byte) (n int, err error) {
    if len(p) == 0 {
        return 0, nil
    }
    r.mrb.mu.RLock()
    n = copy(p, r.mrb.buf[r.off:])
    r.mrb.mu.RUnlock()
    if n == 0 {
        return 0, io.EOF
    }
    r.off += n
    return n, nil
}

1
我链接到了只追加提交日志,因为它看起来与您的要求非常相似。我对分布式系统和提交日志还很陌生,可能会割裂一些概念,但是kafka的介绍清楚地解释了所有内容,并配有漂亮的图表。
Go对我来说也很新,所以我确定有更好的方法:
但也许你可以将缓冲区建模为一个切片,我认为有几种情况:
- 缓冲区没有读取器,新数据被写入缓冲区,缓冲区长度增加 - 缓冲区有一个/多个读取器: - 读取器订阅缓冲区 - 缓冲区创建并返回一个通道给该客户端 - 缓冲区维护客户端通道列表 - 发生写操作->循环遍历所有客户端通道并发布到它(pub sub)
这解决了实时消费者流的pubsub问题,其中消息被扩散,但不涉及回溯。
Kafka使回溯成为可能,他们的intro illustrates说明了如何做到这一点 :)

这个偏移量由消费者控制:通常情况下,当消费者读取记录时,它会线性地推进其偏移量,但实际上,由于位置是由消费者控制的,因此它可以按任何顺序消耗记录。例如,消费者可以将偏移量重置为旧的位置以重新处理过去的数据,或者跳到最新的记录并从“现在”开始消费。

这些特性的组合意味着Kafka消费者非常便宜 - 它们可以来去自如,对集群或其他消费者的影响不大。例如,您可以使用我们的命令行工具“tail”任何主题的内容,而不会改变任何现有消费者所消费的内容。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接