使用std::atomic实现的C++11无锁队列(多写单读)

5
我使用C++11中的新std::atomic实现了一个简单的无锁(lockfree)队列。我看不出我在这里做错了什么。
#include <atomic>

template<typename T>
class lockless_queue
{
public:
    template<typename DataType>
    struct node
    {
        node(const DataType& data)
          : data(data), next(nullptr) {}
        DataType data;
        node* next;
    };

    lockless_queue()
      : head_(nullptr) {}

    void produce(const T &data)
    {
        node<T>* new_node = new node<T>(data);
        // put the current value of head into new_node->next
        new_node->next = head_.load(std::memory_order_relaxed);
        // now make new_node the new head, but if the head
        // is no longer what's stored in new_node->next
        // (some other thread must have inserted a node just now)
        // then put that new head into new_node->next and try again
        while(!std::atomic_compare_exchange_weak_explicit(
            &head_,
            &new_node->next,
            new_node,
            std::memory_order_release,
            std::memory_order_relaxed)) {}
    }

    node<T>* consume_all()
    {
        // Reset queue and return head atomically
        return head_.exchange(nullptr, std::memory_order_consume);
    }
private:
    std::atomic<node<T>*> head_;
};

// main.cpp
#include <iostream>

int main()
{
    lockless_queue<int> s;
    s.produce(1);
    s.produce(2);
    s.produce(3);
    auto head = s.consume_all();
    while (head)
    {
        auto tmp = head->next;
        std::cout << tmp->data << std::endl;
        delete head;
        head = tmp;
    }
}

我的输出:

2
1
Segmentation fault (core dumped)

我能得到一些指引在哪里查找或者提示我可能做错了什么吗?

谢谢!


以防你不知道(并不是建议你放弃努力),但 Boost 有一个无锁队列。http://www.boost.org/doc/libs/1_53_0/doc/html/boost/lockfree/queue.html - 111111
2
你正在一个线程中顺序地推入三个项目,然后将它们弹出。为什么你不能调试它?如果你不能在一个线程和没有争用的情况下解决它,我对于多个生产者和一个消费者的情况并不抱有太大的希望。 - Martin James
2个回答

3
您正在解除引用tmp而不是head:
while (head)
    {
        auto tmp = head->next;
        std::cout << tmp->data << std::endl;
        delete head;
        head = tmp;
    }

should be:

while (head)
    {
        std::cout << head->data << std::endl;
        auto tmp = head->next;
        delete head;
        head = tmp;
    }

这就是为什么你的输出中没有出现3,而出现了分段错误

2

你的代码中还有一个错误,只有在尝试执行并发入队操作时才会暴露出来。如果你的 compare_exchange_weak_explicit 失败了,那么意味着另一个线程成功地改变了 head 指针,因此在你再次尝试进行 CAS 操作之前,需要重新将 head 指针的新值加载到 new_node->next 中。下面的代码可以解决这个问题:

    while(!std::atomic_compare_exchange_weak_explicit(
        &head_,
        &new_node->next,
        new_node,
        std::memory_order_release,
        std::memory_order_relaxed)) {
        new_node->next = head_.load(std::memory_order_relaxed);
    }

4
实际上并不是这样。如果 std::atomic_compare_exchange 失败,它将把期望的值更新为实际值。这种功能是否令人满意存在争议。你甚至可能想自己执行此操作,但 std::atomic_compare_exchange 可以为你完成这个任务。 - laurisvr

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接