如何在这个无锁栈函数中预防未定义行为和ABA问题?

3
我目前正在使用C++11开发无锁单向链表,并且我在popFront()函数中遇到了问题,或者至少我知道它在某些情况下会出现问题。
无论如何,这是我目前的代码:
std::shared_ptr<T> popFront(void)
{
    auto p = atomic_load(&head);
    while(p && !atomic_compare_exchange_weak(&head, &p, p->next))
    {}
    return p ? p->data : std::shared_ptr<T>();
}

请注意,head的类型是shared_ptr
然而,我预计会遇到一些问题。首先是当两个线程执行popFront()时,它们都读取相同的head,并且一个线程先完成。在第二个线程完成之前,调用者删除了指向的对象,因此第二个线程现在正在处理已删除的内存。第二个问题是经典的ABA问题。
这个链表的想法是让它无锁,所以我想避免在这个函数中使用锁。不幸的是,我不知道如何解决这些问题。任何建议将不胜感激。

不要修复这段代码,不要修复这段代码,写出更好的代码。为什么要让自己经历这样的痛苦呢?即使你修复了所有问题,它的性能也不会比一个简单的锁更好,而且它会变得脆弱、难以理解和无法修改。 - undefined
好的。我想修复这个问题有两个原因:(1)我想学习如何修改代码来解决这些问题,(2)现有的实现没有包含我需要的功能。 - undefined
然后使用锁。这样写起来更简单,易于理解和维护。在大多数情况下,它的性能更好。而且你可以确信它是正确的,不必担心会错过一些奇怪的边缘情况,导致它失败。 - undefined
@David 考虑到对于如此短的操作而言,使用锁基本上就是一个自旋锁(至少在我能想到的所有实现中,内核调用都是昂贵的),它与交换操作基本上是一样的,所以我真的不明白为什么它在“大多数情况下”会表现得更好。其他方面我都同意,但性能的说法似乎不太可能。 - undefined
让我帮你谷歌一下。https://en.wikipedia.org/wiki/Hazard_pointer 是解决无锁结构中的ABA问题的一种方法。https://en.wikipedia.org/wiki/ABA_problem#Workarounds 提到了其他几种方法。 - undefined
@Voo 如果你有两个频繁竞争的线程,锁有很大的机会将其中一个线程调度出去,从而避免竞争。而无锁的方法则不会调度竞争的线程,实际上是为了最大化竞争而设计的。 - undefined
2个回答

1

有很多解决方案可以设计一个没有ABA问题的无锁队列。

这篇文章应该提供了一些见解,解决此类问题的一些通用工具可以在这里找到。

现在,针对你提到的问题:

第二个线程完成之前,调用者删除了被指向的对象,因此第二个线程现在正在使用已删除的内存

是的,这种情况可能会发生。解决此问题的方法是使用标记指针:在32位体系结构上,最后的2(或更多)位未使用,因此它们可以用于标记,在64位体系结构上,我们至少有3个未使用的位。

因此,我们可以通过设置指针的一些未使用位来逻辑删除指针,而不是物理删除它,如下所示:
__inline struct node* setTag(struct node* p, unsigned long TAG)
{
    return (struct node*) ((uintptr_t)p | TAG);
}

__inline bool isTagged(struct node* p, unsigned long TAG)
{
    return (uintptr_t)p == (uintptr_t)p &  ~TAG;
}

__inline struct node* getUntaggedAddress(struct node* p, unsigned long TAG)
{
    return (struct node*)((uintptr_t)p &  ~TAG);
}

TAG的取值范围为4(32位架构)或8(64位架构),具体取决于计算机架构和字对齐,其中2/3个或更多未使用的位。

现在,在执行CAS时,我们忽略标记指针,因此仅操作有效指针。

在队列上执行出队操作时,我们可以按以下方式执行:

int dequeue(qroot* root)
{
    qnode* oldHead;

    do
    {
        oldHead = root->head;

        if (isTagged(root->head)) //disregard tagged addresses
            return NULL;

        oldHead = getUntaggedAddress(root->head); //we do a CAS only if the old head was unchanged

    } while (root->head.compare_exchange_strong(oldHead, oldHead->next, std::memory_order_seq_cst));

    return &(oldHead->data);
}

给定。
typedef struct qnode
{
    std::atomic<qnode*> next;
    int data;
}qnode;

typedef struct qroot
{
    std::atomic<qnode*> head; //Dequeue and peek will be performed from head
    std::atomic<qnode*> tail; //Enqueue will be performed to tail
}qroot;

1

使线程更容易的一件事是不释放内存。如果您正在使用大量的链表节点,可以考虑使用节点池。而不是释放节点,您将其返回到池中。这解决了您问题的一部分。

ABA很简单。每次更改头指针时,您需要增加一个计数器。您需要同时原子地写入此计数器和指针。如果使用32位寻址,则使用64位比较和交换(CAS),并在额外的32位中存储计数器。如果使用64位寻址,请避免使用128位比较和交换,因为它可能很慢(在我们的Xenon芯片上,任何高达64位的内容都很快)。由于Windows和Linux都不支持完整的64位寻址,因此您可以利用其中一些64位来进行ABA。我使用联合体来实现32位和64位寻址模式的此操作。

你不需要计算每个变化,只需要捕捉每个变化。即使尝试使用大量线程尽可能快地更改头以引起ABA,它仍很少发生。在现实生活中,这种情况很少发生。例如,您不需要计算得非常高。我通常使用4位并让它滚动。我可能会因此而陷入麻烦。如果需要,可以使用更多位数。
在此示例中,我假设有64位,并使用CAS()进行比较和交换,因此您将不得不替换编译器用于CAS的内容:
typedef unsigned __int64_t U8;

struct TNode {
    TNode* m_pNext;
};

template<class T>
union THead {
    struct {
        U8  m_nABA : 4,  
            m_pNode:60;  // Windows only supports 44 bits addressing anyway.
    };
    U8  m_n64; // for CAS
    // this constructor will make an atomic copy on intel 
    THead(THead& r)         { m_n64 = r.m_n64; }
    T* Node()               { return (T*)m_pNode; }
    // changeing Node bumps aba
    void Node(T* p)         { m_nABA++; m_pNode = (U8)p; return this; }
};

// pop pNode from head of list.
template<class T>
T* Pop(volatile THead<T>& Head) {
    while (1) { // race loop
        // Get an atomic copy of head and call it old.
        THead<T> Old(Head);
        if (!Old.Node())
            return NULL;
        // Copy old and call it new.
        THead<T> New(Old);
        // change New's Node, which bumps internal aba
        New.Node(Old.Node()->m_pNext);
        // compare and swap New with Head if it still matches Old.
        if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
            return Old.Node(); // success
        // race, try again
    }
}

// push pNode onto head of list.
template<class T>
void Push(volatile THead<T>& Head, T* pNode) {
    while (1) { // race loop
        // Get an atomic copy of head and call it old.
        // Copy old and call it new.
        THead<T> Old(Head), New(Old);
        // Wire node t Head
        pNode->m_pNext = New.Node();
        // change New's head ptr, which bumps internal aba
        New.Node(pNode);
        // compare and swap New with Head if it still matches Old.
        if (CAS(&Head.m_n64, Old.m_n64, New.m_n64))
            break; // success
        // race, try again
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接