使用头文件`<atomic>`实现C++11自旋锁

42

我实现了 SpinLock 类,代码如下:

struct Node {
    int number;
    std::atomic_bool latch;

    void add() {
        lock();
        number++;
        unlock();
    }
    void lock() {
        bool unlatched = false;
        while(!latch.compare_exchange_weak(unlatched, true, std::memory_order_acquire));
    }
    void unlock() {
        latch.store(false , std::memory_order_release);
    }
};

我实现了上述的类,并创建了两个线程,每个线程调用同一个Node类实例的add()方法,每个线程调用10 million次。

不幸的是,结果不是20 million。我错过了什么?


5
请注意,这是可能实现的最糟糕的自旋锁之一。其中存在的问题包括:1)当你最终获得了lock时,你需要离开while循环,这是最糟糕的时间点,因为你需要执行一系列母分支预测。2)lock函数可能会使在同一虚拟核心上运行的另一个线程挨饿,特别是在超线程CPU上。 - David Schwartz
1
@DavidSchwartz,感谢您的评论。我可以请问一下您提到的问题吗? 2)那个锁定功能可能会使另一个线程饿死(没错!这是有意为之的,因为我确信锁的生命周期非常短)。 我可以使用一些自旋计数器来缓解这个问题,我是对的吗? 1)为什么我在这段代码中使用了“所有误判分支的母亲”,我该如何改进它?您对此有什么评论吗? 再次感谢您。 - syko
2
Pause指令能够阻止预测执行,从而消除了分支预测错误的惩罚。(顺便说一下,如果你不透彻地了解这种东西,就不应该写自旋锁。你会犯各种错误,甚至还不知道其他选择。) - David Schwartz
5
@DavidSchwartz,我有兴趣写这些东西,但我不是很了解它们。你能推荐一下怎样让自己深入了解吗?请注意保持翻译内容的原意和简洁性,不得添加解释或其他内容。 - codeshot
8
听起来唯一能够获得足够的知识去尝试编写这种代码的方法就是尝试编写这种代码。 - codeshot
显示剩余9条评论
3个回答

49
问题在于compare_exchange_weak一旦失败就会更新unlatched变量。根据compare_exchange_weak的文档:

将原子对象所包含的值与expected进行比较: - 如果相等,使用val替换包含的值(与store类似)。 - 如果不相等,则用原子对象中的值替换expected。

也就是说,在第一次失败的compare_exchange_weak之后,unlatched将被更新为true,所以下一次循环迭代将尝试将truetrue进行compare_exchange_weak。这将成功,你刚刚拿到了另一个线程持有的锁。
解决方法: 确保在每次compare_exchange_weak之前将unlatched设置回false,例如:
while(!latch.compare_exchange_weak(unlatched, true, std::memory_order_acquire)) {
    unlatched = false;
}

5
+1,并向原帖作者说明,Node类的默认构造函数不会像所展示的那样对number进行值初始化。也就是说,Node node;并不能在node.number中留下确定的值来启动你的序列。你需要一个这样用法的构造函数: Node() : number() {}就足够了。点击此处查看实例 - WhozCraig
谢谢您的评论!我发现了我的错误!谢谢。 - syko
5
请注意,该循环还需要使用__mm_pause()函数来启用超线程。 - Anton

45

正如@gexicide提到的那样,问题在于compare_exchange函数会使用原子变量的当前值来更新expected变量。这也是为什么你必须首先使用本地变量unlatched的原因。为了解决这个问题,你可以在每个循环迭代中将unlatched重新设置为false。

然而,与其使用compare_exchange进行接口不太适合的操作,不如直接使用std::atomic_flag更简单:

class SpinLock {
    std::atomic_flag locked = ATOMIC_FLAG_INIT ;
public:
    void lock() {
        while (locked.test_and_set(std::memory_order_acquire)) { ; }
    }
    void unlock() {
        locked.clear(std::memory_order_release);
    }
};

来源:cppreference

手动指定内存顺序只是一个次要的潜在性能调整,我从源代码中复制了这个建议。如果简单性比最后一点性能更重要,您可以坚持使用默认值,并且只需调用locked.test_and_set() / locked.clear()

顺便说一句: std::atomic_flag 是唯一保证不锁定的类型,尽管我不知道有哪个平台上针对 std::atomic_bool 的操作不是无锁操作。

更新:正如@David Schwartz、@Anton和@Technik Empire在评论中解释的那样,空循环具有某些不良影响,如分支误判、HT处理器上的线程饥饿和过高的功耗——总之,这是一种相当低效的等待方式。影响和解决方案因架构、平台和应用程序而异。我不是专家,但通常的解决方案似乎是在循环体中添加cpu_relax() (在linux上)或YieldProcessor() (在Windows上)。

编辑2: 仅为明确起见:这里呈现的可移植版本(不包括特殊的cpu_relax等指令)应该已经足够好用于许多应用程序。如果您的SpinLock 在旋转时花费大量时间,因为其他人长时间持有锁(这可能已经表示存在一般设计问题),那么使用普通互斥锁可能更好。


2
只需在 while 循环中添加 std::this_thread::yield 调用即可:http://en.cppreference.com/w/cpp/thread/yield - Martin Gerhardy
3
@Martin: 我考虑过这个,但是std::this_thread::yield()是一个非常重的系统调用,所以我不确定如果将其放在自旋锁的循环体中,是否合适。我的(未经测试的)假设是,在大多数情况下,如果可以使用的话,你应该一开始就使用std::mutex(或类似的东西)。 - MikeMB
24
在自旋锁中使用 yield 没有意义。自旋锁的目标是避免在小型临界区中进行昂贵的上下文切换。 - Jorge Bellon
4
@rox,那不是我的重点。 "自旋互斥锁" 的技术定义是在其被他人占用时不释放 CPU 的锁。在单核系统中,通常不使用自旋锁而是普通的互斥锁。 - Jorge Bellon
2
据我所知,glibc实现使用类似于yield的东西:https://github.com/lattera/glibc/blob/master/mach/spin-solid.c。我不确定您在哪里找到了定义。 - rox
显示剩余8条评论

0
内存优化只在本地起作用,因此这些内存管理方法(td::memory_order_acquire和std::memory_order_release)是无用的。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接