多线程队列原子操作

3
我正在使用std::atomic结构进行尝试,并编写了这个无锁多生产者多消费者队列,我在此处附上代码。队列的思想基于两个栈——生产者栈和消费者栈,它们本质上是链接列表结构。列表的节点包含索引,可以读取或写入实际数据的数组。
列表节点应该是互斥的,即指向节点的指针只能存在于生产者或消费者列表中。生产者将尝试从生产者列表获取节点,消费者从消费者列表获取节点。每当生产者或消费者获得节点的指针时,它应该从两个列表中删除,以便没有其他人可以获取它。我使用std::atomic_compare_exchange函数自旋直到弹出节点。
问题是,逻辑可能有问题,或者操作不是原子操作,因为即使有1个生产者和1个消费者,足够长时间后,队列仍会死锁。我注意到的是,如果你断言cell!= cell->m_next,那么断言就会被触发!所以这很可能是一个一直在我面前的问题,而我却没看出来,所以我想请有经验的人提供一些帮助。
谢谢
#ifndef MTQueue_h
#define MTQueue_h

#include <atomic>

template<typename Data, uint64_t queueSize>
class MTQueue
{
public:

    MTQueue() : m_produceHead(0), m_consumeHead(0)
    {
        for(int i=0; i<queueSize-1; ++i)
        {
            m_nodes[i].m_idx = i;
            m_nodes[i].m_next = &m_nodes[i+1];
        }
        m_nodes[queueSize-1].m_idx = queueSize - 1;
        m_nodes[queueSize-1].m_next = NULL;

        m_produceHead = m_nodes;
        m_consumeHead = NULL;
    }

    struct CellNode
    {
        uint64_t m_idx;
        CellNode* m_next;
    };

    bool push(const Data& data)
    {
        if(m_produceHead == NULL)
            return false;

        // Pop the producer list.
        CellNode* cell = m_produceHead;
        while(!std::atomic_compare_exchange_strong(&m_produceHead,
                                                   &cell, cell->m_next))
        {
            cell = m_produceHead;
            if(!cell)
                return false;
        }

        // At this point cell should point to a node that is not in any of the lists
        m_data[cell->m_idx] = data;

        // Push that node as the new head of the consumer list
        cell->m_next = m_consumeHead;
        while (!std::atomic_compare_exchange_strong(&m_consumeHead,
                                                    &cell->m_next, cell))
        {
            cell->m_next = m_consumeHead;
        }
        return true;
    }

    bool pop(Data& data)
    {
        if(m_consumeHead == NULL)
            return false;

        // Pop the consumer list
        CellNode* cell = m_consumeHead;
        while(!std::atomic_compare_exchange_strong(&m_consumeHead,
                                                   &cell, cell->m_next))
        {
            cell = m_consumeHead;
            if(!cell)
                return false;
        }

        // At this point cell should point to a node that is not in any of the lists
        data = m_data[cell->m_idx];

        // Push that node as the new head of the producer list
        cell->m_next = m_produceHead;
        while(!std::atomic_compare_exchange_strong(&m_produceHead,
                                                   &cell->m_next, cell))
        {
            cell->m_next = m_produceHead;
        }
        return true;
    };

private:

    Data m_data[queueSize];

    // The nodes for the two lists
    CellNode m_nodes[queueSize];

    volatile std::atomic<CellNode*> m_produceHead;
    volatile std::atomic<CellNode*> m_consumeHead;
};

#endif

2
这对我来说看起来像是一个ABA问题:https://stackoverflow.com/questions/25628225/where-is-the-race/25628314#25628314 - dohashi
也许...怎么测试呢? - bitwise
ABA 很难进行测试。我建议真正理解 ABA 问题,然后思考您的算法是否容易受到攻击。将数据结构建模为状态机可能也会有所帮助。原子数据结构可能非常棘手,需要正确地处理。 - dohashi
1
我认为ABA问题应该被避免,因为每次失败时堆栈CellNode m_next都会更新。我还注意到的是,即使比较的变量相同,原子比较交换似乎也会失败。所以,如果你在while循环中断,变量将是相同的,但是,你就在那里...我想知道这是否可能是内存屏障的问题? - bitwise
1
@bitwise:另外一个问题,为什么你要将数据声明为“volatile”?使用“atomic”提供的内存同步保证不足以满足需求吗?难道你真的需要回退到更慢、更繁琐的“volatile”语义吗?我有什么遗漏的地方吗? - yzt
显示剩余6条评论
2个回答

1
我相信我已经成功破解了这个问题。对于大小从2到1024的队列和从1个生产者/1个消费者到100个生产者/100个消费者的队列,在1000000次写入/读取时没有活锁。
以下是解决方案。关键是不直接在比较和交换中使用cell->m_next(顺便说一下,生产者代码也适用于此),并要求严格的内存顺序规则:
这似乎证实了我的怀疑,即它是编译器重新排序读写的结果。 以下是代码:
bool push(const TData& data)
{
    CellNode* cell = m_produceHead.load(std::memory_order_acquire);
    if(cell == NULL)
        return false;

    while(!std::atomic_compare_exchange_strong_explicit(&m_produceHead,
                                                        &cell,
                                                        cell->m_next,
                                                        std::memory_order_acquire,
                                                        std::memory_order_release))
    {
        if(!cell)
            return false;
    }

    m_data[cell->m_idx] = data;

    CellNode* curHead = m_consumeHead;
    cell->m_next = curHead;
    while (!std::atomic_compare_exchange_strong_explicit(&m_consumeHead,
                                                         &curHead,
                                                         cell,
                                                         std::memory_order_acquire,
                                                         std::memory_order_release))
    {
        cell->m_next = curHead;
    }

    return true;
}

bool pop(TData& data)
{
    CellNode* cell = m_consumeHead.load(std::memory_order_acquire);
    if(cell == NULL)
        return false;

    while(!std::atomic_compare_exchange_strong_explicit(&m_consumeHead,
                                                        &cell,
                                                        cell->m_next,
                                                        std::memory_order_acquire,
                                                        std::memory_order_release))
    {
        if(!cell)
            return false;
    }

    data = m_data[cell->m_idx];

    CellNode* curHead = m_produceHead;
    cell->m_next = curHead;
    while(!std::atomic_compare_exchange_strong_explicit(&m_produceHead,
                                                        &curHead,
                                                        cell,
                                                        std::memory_order_acquire,
                                                        std::memory_order_release))
    {
        cell->m_next = curHead;
    }

    return true;
};

抱歉,您正在发布基本上是危险/错误的代码。它存在许多问题,例如,失败的内存顺序永远无法释放。 - Yuki

1
我看到你的队列实现存在一些问题:
  1. 它不是一个队列,而是一个栈:最近推入的项目是第一个弹出的项目。并没有什么错,但称其为队列很容易引起混淆。实际上,它是两个无锁栈:一个栈最初由节点数组填充,另一个栈使用第一个栈作为空闲节点列表存储实际数据元素。

  2. pushpop都存在CellNode::m_next的数据竞争(毫不奇怪,因为它们都做同样的事情,即从一个栈中弹出一个节点并将该节点推入另一个栈)。假设两个线程同时进入例如pop并从m_consumeHead读取相同的值。线程1成功地弹出并设置data。然后,线程1将m_produceHead的值写入cell->m_next,而线程2同时正在读取cell->m_next以传递给std::atomic_compare_exchange_strong_explicit。两个线程对cell->m_next进行同时非原子读写,在定义上是数据竞争。

    这就是并发文献中所谓的“良性”竞争:读取了过时/无效的值,但从未使用。如果您确信您的代码永远不需要在可能引起火爆爆炸的架构上运行,则可以忽略它,但为了严格符合标准内存模型,您需要使m_next成为原子,并使用至少memory_order_relaxed读取以消除数据竞争。

  3. ABA。您的比较交换循环的正确性基于这样一个前提:原子指针(例如,m_produceHeadm_consumeHead)在初始加载和稍后的比较交换中具有相同的值意味着点对象也必须保持不变。在任何设计中,只要可以比某个线程更快地回收对象,该前提就不成立。考虑以下事件序列:

    1. 线程1进入pop并读取m_consumeHeadm_consumeHead->m_next的值,但在调用比较交换之前阻塞。
    2. 线程2成功从m_consumeHead中弹出该节点并阻塞。
    3. 线程3将多个节点推入m_consumeHead
    4. 线程2取消阻塞并将原始节点推入m_produceHead
    5. 线程3从m_produceHead中弹出该节点,并将其推回m_consumeHead
    6. 线程1最终取消阻塞并调用比较交换函数,由于m_consumeHead的值相同而成功。它弹出了该节点-这很好,但将m_consumeHead设置为它在步骤1中读取回来的过时m_next值。同时Thread 3推送的所有节点都泄漏了。

非常好的观察!谢谢,我相信你是正确的。如果时间允许,我会修改设计并更新这个帖子。 - bitwise
  1. 看起来是ABA问题。
- v.oddou

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接