C++中的双缓冲单生产者单消费者数据结构

30
我在$work公司有一个应用程序,需要在两个实时线程之间移动,并且这两个线程的调度频率不同(实际调度不受我控制)。该应用程序类似于硬实时(其中一个线程必须驱动硬件接口),因此线程间的数据传输应尽可能无锁和无等待。
需要注意的是,只需要传输一个数据块:由于两个线程以不同的速率运行,较快的线程在较慢的线程唤醒之间完成两次迭代,因此在这种情况下,覆盖写缓冲区中的数据以使较慢的线程仅获取最新数据是可以的。
换句话说,双缓冲解决方案就足够了,而不是队列。这两个缓冲区在初始化期间分配,并且读取线程和写入线程可以调用该类的方法来获取其中一个缓冲区的指针。
C++代码:
#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() {
        m_write_busy = false;
        m_read_idx = m_write_idx = 0;
    }

    ~ProducerConsumerDoubleBuffer() { }

    // The writer thread using this class must call
    // start_writing() at the start of its iteration
    // before doing anything else to get the pointer
    // to the current write buffer.
    T * start_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = true;
        m_write_idx = 1 - m_read_idx;

        return &m_buf[m_write_idx];
    }
    // The writer thread must call end_writing()
    // as the last thing it does
    // to release the write busy flag.
    void end_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = false;
    }

    // The reader thread must call start_reading()
    // at the start of its iteration to get the pointer
    // to the current read buffer.
    // If the write thread is not active at this time,
    // the read buffer pointer will be set to the 
    // (previous) write buffer - so the reader gets the latest data.
    // If the write buffer is busy, the read pointer is not changed.
    // In this case the read buffer may contain stale data,
    // it is up to the user to deal with this case.
    T * start_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (!m_write_busy) {
            m_read_idx = m_write_idx;
        }

        return &m_buf[m_read_idx];
    }
    // The reader thread must call end_reading()
    // at the end of its iteration.
    void end_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_read_idx = m_write_idx;
    }

private:
    T m_buf[2];
    bool m_write_busy;
    unsigned int m_read_idx, m_write_idx;
    std::mutex m_mutex;
};

为了避免读取线程中的旧数据,负载结构被版本化。为了方便线程之间的双向数据传输,使用了上述两个实例,它们是相反的。
问题:
  • 这个方案线程安全吗?如果有问题,出在哪里?
  • 能否在没有互斥锁的情况下完成?也许只需要内存屏障或CAS指令?
  • 能否做得更好?

2
为什么不使用循环缓冲区?我猜这将简化获取最新数据的问题。 - Rakib
好了...这比我想象的更复杂 :-) 我正在努力实现中,会随时向你报告。 - Cameron
如果没有可读内容,您希望读取线程执行什么操作? - billmcc
看一下这个,我很快就为一个小项目做了出来。它是“无锁”的,但依赖于原子操作。https://github.com/chadkler/hipoconcon/blob/master/inc/ringbuffer.h - Chad
1
@Chad:如果你使用显式的获取和释放内存屏障而不是默认的顺序一致性内存屏障,你将获得更好的性能(在x86上要好得多)。此外,会减慢速度-考虑将大小强制为2的幂次方,这样您就可以使用。顺便说一句,Facebook的folly队列具有相同的语义,但我提到的所有改进都有(他们使用if而不是并依赖于分支预测) :-) - Cameron
显示剩余4条评论
3个回答

13
非常有趣的问题!比我最初想象的要棘手得多 :-) 我喜欢无锁解决方案,所以我尝试在下面找出一个。
有许多方法来考虑这个系统。您可以将其建模为固定大小的循环缓冲区/队列(具有两个条目),但是然后您将失去更新下一个可用值以供消费的能力,因为您不知道消费者是否已经开始读取了最近发布的值还是仍在(可能)读取先前的值。因此,除了标准环形缓冲区的状态外,还需要额外的状态才能达到更优化的解决方案。
首先请注意,在任何给定时间点,生产者始终可以安全地写入一个单元格;如果一个单元格正在被消费者读取,则可以写入另一个单元格。让我们将可以安全写入的单元格称为“活动”单元格(可以潜在地从中读取的单元格是不是活动单元格)。只有在另一个单元格当前没有被读取时,才能切换活动单元格。
与始终可以写入的活动单元格不同,非活动单元格只有在包含值时才能从中读取;一旦使用该值,它就会消失。(这意味着在积极的生产者的情况下避免了活锁;在某个时刻,消费者将清空一个单元格并停止触及单元格。一旦发生这种情况,生产者就可以确定发布一个值,而在此之前,它只能发布一个值(更改活动单元格),如果消费者不处于读取过程中。)
如果有一个值准备好被消耗,那么只有消费者可以改变事实(无论是对于非活动单元格还是活动单元格);随后的制造可能会更改哪个单元格是活动的和发布的值,但是一个值总是准备好被读取直到它被消耗。
一旦生产者完成对活动单元格的写入,它可以通过更改哪个单元格为活动单元格(交换索引)来“发布”此值,前提是消费者不在阅读另一个单元格的中间。如果消费者正在阅读另一个单元格,则无法进行交换,但在这种情况下,消费者可以在阅读完该值后进行交换,前提是生产者没有处于写入过程中(如果是,则生产者完成写入后会进行交换)。 实际上,通常情况下,只要消费者是唯一访问系统的用户,消费者总是可以在完成阅读后进行交换,因为消费者的虚假交换是无害的:如果另一个单元格中有内容,则交换将导致下一个读取该内容,否则交换不会对任何内容产生影响。
所以,我们需要一个共享变量来跟踪活动单元格是什么,并且我们还需要一种方法让生产者和消费者指示他们是否正在进行操作。我们可以将这三个状态存储到一个原子变量中,以便能够同时影响它们(原子地)。
我们还需要一种方式让消费者检查第一个非活动单元格中是否有任何内容,并让两个线程根据需要修改该状态。我尝试了一些其他方法,但最终最简单的方法就是在另一个原子变量中也包含此信息。这样做使得系统中所有状态更改都是原子的,因此更容易理解。
我已经想出了一个无等待实现(无锁,并且所有操作都在有限数量的指令内完成)。
代码时间!
#include <atomic>
#include <cstdint>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
        // Increment active users; once we do this, no one
        // can swap the active cell on us until we're done
        auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
        return &m_buf[state & 1];
    }

    void end_writing() {
        // We want to swap the active cell, but only if we were the last
        // ones concurrently accessing the data (otherwise the consumer
        // will do it for us when *it's* done accessing the data)

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
        state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
        if ((state & 0x6) == 0) {
            // The consumer wasn't in the middle of a read, we should
            // swap (unless the consumer has since started a read or
            // already swapped or read a value and is about to swap).
            // If we swap, we also want to clear the full flag on what
            // will become the active cell, otherwise the consumer could
            // eventually read two values out of order (it reads a new
            // value, then swaps and reads the old value while the
            // producer is idle).
            m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
        }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
        m_readState = m_state.load(std::memory_order_relaxed);
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // Nothing to read here!
            return nullptr;
        }

        // At this point, there is guaranteed to be something to
        // read, because the full flag is never turned off by the
        // producer thread once it's on; the only thing that could
        // happen is that the active cell changes, but that can
        // only happen after the producer wrote a value into it,
        // in which case there's still a value to read, just in a
        // different cell.

        m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

        // Now that we've incremented the user count, nobody can swap until
        // we decrement it
        return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // There was nothing to read; shame to repeat this
            // check, but if these functions are inlined it might
            // not matter. Otherwise the API could be changed.
            // Or just don't call this method if start_reading()
            // returns nullptr -- then you could also get rid
            // of m_readState.
            return;
        }

        // Alright, at this point the active cell cannot change on
        // us, but the active cell's flag could change and the user
        // count could change. We want to release our user count
        // and remove the flag on the value we read.

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
        state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
        if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
            // Oi, we were the last ones accessing the data when we released our cell.
            // That means we should swap, but only if the producer isn't in the middle
            // of producing something, and hasn't already swapped, and hasn't already
            // set the flag we just reset (which would mean they swapped an even number
            // of times).  Note that we don't bother swapping if there's nothing to read
            // in the other cell.
            m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
        }
    }

private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there's at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there's a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
};

请注意,语义是这样的,消费者永远无法读取相同的值,并且它读取的值总是比上次读取的值更新。它在内存使用方面也相当高效(类似于您原来的解决方案)。我避免了使用CAS循环,因为它们通常不如在争用下的单个原子操作高效。
如果您决定使用上面的代码,请先编写一些全面的(线程化)单元测试和适当的基准测试。我已经测试过了,但只是勉强测试了一下。如果您发现任何错误,请告诉我 :-)
我的单元测试:
ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_writing();
        if (item != nullptr) {      // Always true
            *item = i;
        }
        buf.end_writing();
    }
});
std::thread consumer([&]() {
    int prev = -1;
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_reading();
        if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
        }
        buf.end_reading();
    }
});
producer.join();
consumer.join();

关于你的原始实现,我只是粗略地看了一下(设计新东西更有趣,呵呵),但david.pfx的答案似乎解决了你问题的那部分。


谢谢!干得好!不幸的是,在我的应用程序中,希望读取器可以读取相同的值两次(对于较慢的线程向较快的线程发送数据的情况)。如果我要使用这个解决方案,我将不得不重新考虑快速线程的读取器部分的实现。此外,我发现位运算处理内部状态很难理解和脆弱。例如,对任何四种方法之一的重复调用都会破坏状态(我不知道这是否构成了一个错误,但至少是实现的一个弱点)。 - user3638506
1
@user:我实现了最强的设计约束。如果你想让读者在发布新值之前继续阅读旧值,只需添加一个缓存项,并在成功读取新值后设置它;然后当begin_reading中没有可用值时,返回指向该值的指针,而不是nullptr(前提是首先已经设置了该值)。无需重新思考;-) - Cameron
@user:我同意位操作有点难以理解,但据我所知它是正确的。我不确定你所说的“重复调用”是什么意思,但是肯定任何对begin_x的调用都应该与一次end_x的调用相匹配。至于状态更改,每个更改本身都是原子的,即使完整的状态更改分布在几个原子更新中也是如此。这是因为保证在用户计数高于零时不进行交换。在任何单一时间点,整个状态都是有效的。我会添加我的单元测试供您审查。 - Cameron
@user:看看我的修改,我成功地删除了一个原子操作(使写入速度更快)。 - Cameron
一些原子操作末尾的"+ flag - 0x2"和类似表达式不应该存在,对吧? - user3638506
显示剩余5条评论

4

是的,我认为它已经坏了。

如果读取器连续进行起始/结束/起始操作,则会将其读取索引更新为写入索引,并且有可能从写入索引读取数据,即使写入正在忙碌。

问题本质上在于写入器不知道读取器将使用哪个缓冲区,因此写入器应确保两个缓冲区始终有效。除非我误解了这里没有显示的某些逻辑,否则它无法在任何时间内将数据写入缓冲区。

是的,我认为可以使用CAS或等效逻辑而无需锁定。我不会在这个空间中尝试表达算法。我相信它存在,但我不能第一次正确地将其编写出来。在网上搜索了一些看起来可行的候选者。使用CAS的无等待IPC似乎是一个非常有趣的主题,也是一些研究的主题。


经过进一步思考,算法如下:

  • 3个缓冲区:一个用于写入器,一个用于读取器使用,以及一个额外的缓冲区。缓冲区被排序:它们形成一个环(但见注释)。
  • 每个缓冲区都有一个状态:空闲、满、正在写入、正在读取。
  • 一个函数,可以检查缓冲区的状态,并在单个原子操作中有条件地将状态更改为不同的值。我将使用CSET进行此操作。

写入器:

Find the first buffer that is FREE or FULL
  Fail: assert (should never fail, reader can only use one buffer)
  CSET buffer to WRITING
Write into the buffer
CSET buffer to FULL

读者:

Find first buffer that is FULL
    Fail: wait (writer may be slow)
    CSET buffer to READING
Read and consume buffer
CSET buffer to FREE

注意:此算法不能保证缓冲区严格按到达顺序处理,也没有简单的方法可以做到。如果这很重要,则应增强算法,使用由写入者设置的缓冲区序列号,以便读取器选择最新的缓冲区。
我将代码作为实现细节留下。
CSET函数并不简单。它必须原子地测试特定共享内存位置是否等于预期值,如果是,则将其更改为新值。如果成功进行更改,则返回true;否则返回false。如果两个线程同时访问同一位置(可能在不同的处理器上),则实现必须避免竞争条件。
如果可用,C++标准原子操作库包含一组atomic_compare_exchange函数,应该能够满足需求。

啊,你说得对。一个开始/结束/开始序列暴露写缓冲区给写入者的问题可以通过在end_writing()中添加忙标志检查来修补,但这会导致另一个问题:由于某些时间组合可能会发生读取线程永远不会获得任何新数据的情况,因为它总是在写入者忙碌时被调用。 - user3638506
我认为你的四状态缓冲区(FREE,WRITING,FULL,READING)只有在状态改变时是原子的解决方案。但在我看来,只应该使用两个缓冲区,并且如果您想确保顺序得到尊重,则稍微修改“结束写入”即可:当将一个缓冲区设置为FULL时,如果另一个缓冲区也是FULL,则应将其设置为FREE。这保证了在任何给定时间只能有一个缓冲区是FULL,并且与只处理一个缓冲区的假设是一致的。 - Serge Ballesta
@SergeBallesta:要求是最小化等待时间。使用3个缓冲区,如果有数据,写入者永远不会等待,读取者也永远不会等待。 - david.pfx
@david.pfx;你说得对,但使用两个缓冲区,写入者永远不会等待:如果一个缓冲区正在读取,另一个缓冲区已满,则重复使用已满的缓冲区。我承认,读取者可能永远看不到它可以处理的缓冲区,但只有在写入者正在写入时才会出现这种情况——因此缓冲区已经包含旧数据——而读取者刚刚结束读取。 - Serge Ballesta
看起来如果我想确保读者始终有缓冲区,而不管写者是否活动,我确实需要3个缓冲区。但是,您确定仅“设置状态”操作是原子的就足够了吗?例如,在写者找到它并将其状态设置为“正在写入”之间,读取器进程可能会劫持FULL缓冲区。 - user3638506
@user3638506:CSET函数需要以单个原子操作的方式对状态进行条件测试和设置。如果写入者的CSET成功,读取者的CSET将不会成功。一旦你拥有了这个函数,其他所有的事情都会随之而来。 - david.pfx

0

这里提供一种使用 InterlockedExchangePointer() 和 SLISTs 的版本。

该解决方案不支持重新读取上一个缓冲区。但如果需要,读取端可以通过复制和 if( NULL == doubleBuffer.beginReader(...) ) { use backup copy ... } 来实现。
我们之所以没有实现这个功能,不是因为难以添加,而是因为它不太现实。想象一下你上次的已知值越来越老 - 秒、天、周。应用程序不太可能仍然想要使用它。因此,将重新读取功能分解到双缓冲区代码中会削弱应用程序的灵活性。

双缓冲区有一个读指针成员。每当调用 beginRead() 时,该值被返回并原子地替换为 NULL。可以将其视为“读取器获取缓冲区”。
使用 endRead(),读取器返回缓冲区,并将其添加到包含可用于写操作的缓冲区的 SLIST 中。

最初,两个缓冲区都添加到了 SLIST 中,读指针为 NULL。

beginWrite() 弹出下一个可用的缓冲区。由于 endWrite() 的实现方式,该值永远不可能为 NULL。

最后但并非最不重要的,endWrite()原子性地交换读指针和返回的新写入缓冲区,并且如果读指针不为NULL,则将其推送到SLIST上。

因此,即使读取端从未读取,写入端也永远不会用尽缓冲区。当读取器读取时,它会获得最新已知值(一次!)。

这种实现不安全的情况是存在多个并发读取器或写入器。但这不是首要目标。

在丑陋的一面,缓冲区需要是具有某些SLIST_HEADER成员的结构。

这里是代码,但请记住,如果你的火星探测器降落在金星上,这不是我的错!

const size_t MAX_DATA_SIZE = 512;
typedef
//__declspec(align(MEMORY_ALLOCATION_ALIGNMENT))
struct DataItem_tag
{
    SLIST_ENTRY listNode;
    uint8_t data[MAX_DATA_SIZE];
    size_t length;
} DataItem_t;

class CDoubleBuffer
{
    SLIST_HEADER m_writePointers;
    DataItem_t m_buffers[2];
    volatile DataItem_t *m_readPointer;

public:
    CDoubleBuffer()
        : m_writePointers()
        , m_buffers()
        , m_readPointer(NULL)
    {
        InitializeSListHead(&m_writePointers);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[0].listNode);
        InterlockedPushEntrySList(&m_writePointers, &m_buffers[1].listNode);
    }
    DataItem_t *beginRead()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, NULL));
        return result;
    }
    void endRead(DataItem_t *dataItem)
    {
        if (NULL != dataItem)
        {
            InterlockedPushEntrySList(&m_writePointers, &dataItem->listNode);
        }
    }
    DataItem_t *beginWrite()
    {
        DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedPopEntrySList(&m_writePointers));
        return result;
    }
    void endWrite(DataItem_t *dataItem)
    {
        DataItem_t *oldReadPointer = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, dataItem));
        if (NULL != oldReadPointer)
        {
            InterlockedPushEntrySList(&m_writePointers, &oldReadPointer->listNode);
        }
    }
};

以下是相关测试代码。(为了使用上述代码和测试代码,您需要包含 <windows.h> 和 <assert.h> 头文件。)

CDoubleBuffer doubleBuffer;

DataItem_t *readValue;
DataItem_t *writeValue;

// nothing to read yet. Make sure NULL is returned.
assert(NULL == doubleBuffer.beginRead());
doubleBuffer.endRead(NULL); // we got nothing, we return nothing.

// First write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 0;
doubleBuffer.endWrite(writeValue);

// Second write without read
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 1;
doubleBuffer.endWrite(writeValue);

// Third write without read - works because it reuses the old buffer for the new write.
writeValue = doubleBuffer.beginWrite();
assert(NULL != writeValue); // if we get NULL here it is a bug.
writeValue->length = 2;
doubleBuffer.endWrite(writeValue);

readValue = doubleBuffer.beginRead();
assert(NULL != readValue); // NULL would obviously be a terrible bug.
assert(2 == readValue->length); // We got the latest and greatest?
doubleBuffer.endRead(readValue);

readValue = doubleBuffer.beginRead();
assert(NULL == readValue); // We expect NULL here. Re-reading is not a feature of this implementation!
doubleBuffer.endRead(readValue);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接