为什么std::mutex比std::atomic更快?

21

我想以多线程模式将对象放入std::vector中。因此,我决定比较两种方法:一种使用std::atomic,另一种使用std::mutex。我发现第二种方法比第一种方法更快。为什么?

我使用的是GCC 4.8.1,在我的机器上(8个线程),我发现第一种解决方案需要391502微秒,而第二种解决方案需要175689微秒。

#include <vector>
#include <omp.h>
#include <atomic>
#include <mutex>
#include <iostream>
#include <chrono>

int main(int argc, char* argv[]) {
    const size_t size = 1000000;
    std::vector<int> first_result(size);
    std::vector<int> second_result(size);
    std::atomic<bool> sync(false);

    {
        auto start_time = std::chrono::high_resolution_clock::now();
        #pragma omp parallel for schedule(static, 1)
        for (int counter = 0; counter < size; counter++) {
            while(sync.exchange(true)) {
                std::this_thread::yield();
            };
            first_result[counter] = counter;
            sync.store(false) ;
        }
        auto end_time = std::chrono::high_resolution_clock::now();
        std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count() << std::endl;
    }

    {
        auto start_time = std::chrono::high_resolution_clock::now();
        std::mutex mutex; 
        #pragma omp parallel for schedule(static, 1)
        for (int counter = 0; counter < size; counter++) {
            std::unique_lock<std::mutex> lock(mutex);       
            second_result[counter] = counter;
        }
        auto end_time = std::chrono::high_resolution_clock::now();
        std::cout << std::chrono::duration_cast<std::chrono::microseconds>(end_time - start_time).count() << std::endl;
    }

    return 0;
}

6
  1. 请发布你的编译器、编译选项和测量结果。
  2. 在测量后对结果数据做一些可观察的操作,否则一个足够好的优化器会删除掉死代码。
- Angew is no longer proud of SO
4
无论如何,这段代码存在严重问题。使用memory_order_relaxed的原子操作不是同步操作。 - T.C.
1
我更新了我的代码。现在当我使用四个线程时,第一种解决方案比第二种更快(25-30%)。但是如果我增加线程数,第一种解决方案比第二种慢(20-25%)。 - Sergey Malashenko
1
谁在乎呢?代码还是有问题的。你认为可以得出什么结论?有问题的代码更快吗?有问题的代码更慢吗?这些有什么用处呢? - R. Martinho Fernandes
在这里https://gcc.gnu.org/ml/gcc-help/2013-10/msg00115.html,我发现std::this_thread::yield()不能正常工作。所以它是我代码中的主要问题。 - Sergey Malashenko
显示剩余2条评论
2个回答

37
我认为仅凭标准是无法回答您的问题的——互斥锁与平台相关性极强。然而,有一件事情需要提到。
互斥锁不慢。您可能会看到一些文章将其性能与自定义自旋锁和其他“轻量级”东西进行比较,但这不是正确的方法——它们不能互换使用。
当相对短时间内锁定(获取)自旋锁时,自旋锁非常快速,获取它们非常便宜,但是其他正在尝试锁定的线程在整个此时间段内都处于活动状态(不断运行循环)。
可以这样实现自定义自旋锁:
class SpinLock
{
private:
    std::atomic_flag _lockFlag;

public:
    SpinLock()
    : _lockFlag {ATOMIC_FLAG_INIT}
    { }

    void lock()
    {
        while(_lockFlag.test_and_set(std::memory_order_acquire))
        { }
    }

    bool try_lock()
    {
        return !_lockFlag.test_and_set(std::memory_order_acquire);
    }

    void unlock()
    {
        _lockFlag.clear();
    }
};

Mutex是一种原语,比较复杂。特别是在Windows系统中,我们有两个这样的原语 - 临界区(Critical Section),它在进程级别上工作,以及互斥量(Mutex),它没有这样的限制。

锁定互斥量(或临界区)的成本要高得多,但操作系统有能力真正将其他等待线程置于“休眠”状态,这可以提高性能并帮助任务调度器有效地管理资源。

为什么我写这篇文章呢?因为现代互斥量通常是所谓的“混合互斥量”。当这样的互斥量被锁定时,它的行为类似于普通自旋锁 - 其他等待线程执行一些“自旋”,然后重型互斥量被锁定以防止浪费资源。

在您的情况下,互斥量在每个循环迭代中被锁定,以执行此指令:

second_result[counter] = omp_get_thread_num();

看起来这是一个快速的锁,所以“真正”的互斥锁可能永远不会被锁定。这意味着,在这种情况下,你的“互斥锁”可以像基于原子操作的解决方案一样快(因为它本身就成为了基于原子操作的解决方案)。
此外,在第一个解决方案中,您使用了某种自旋锁行为,但我不确定这种行为在多线程环境中是否可预测。我非常确定,“锁定”应该具有“获取”语义,而解锁则是一个“释放”操作。“松散”的内存排序对于这种用例可能太弱。
我编辑了代码以使其更加简洁和正确。它使用了std::atomic_flag,这是唯一保证无锁的类型(与std::atomic<>专门化不同),即使std::atomic<bool>也不能保证无锁。
另外,关于“不让步”的评论:这是一个特定情况和需求的问题。自旋锁是多线程编程的非常重要的部分,通过稍微修改其行为,它们的性能通常可以得到提高。例如,Boost库将spinlock::lock()实现如下:
void lock()
{
    for( unsigned k = 0; !try_lock(); ++k )
    {
        boost::detail::yield( k );
    }
}

来源:boost/smart_ptr/detail/spinlock_std_atomic.hpp

detail::yield() 是 (Win32 版本):

inline void yield( unsigned k )
{
    if( k < 4 )
    {
    }
#if defined( BOOST_SMT_PAUSE )
    else if( k < 16 )
    {
        BOOST_SMT_PAUSE
    }
#endif
#if !BOOST_PLAT_WINDOWS_RUNTIME
    else if( k < 32 )
    {
        Sleep( 0 );
    }
    else
    {
        Sleep( 1 );
    }
#else
    else
    {
        // Sleep isn't supported on the Windows Runtime.
        std::this_thread::yield();
    }
#endif
}

[来源:http://www.boost.org/doc/libs/1_66_0/boost/smart_ptr/detail/yield_k.hpp]

首先,线程会旋转一定次数(在这个例子中为4次)。如果互斥锁仍然被锁定,则使用pause指令(如果可用)或调用Sleep(0),这基本上会导致上下文切换并允许调度程序给另一个阻塞的线程执行有用任务。然后,调用Sleep(1)进行实际(短)休眠。非常好!

此外,这个语句:

自旋锁的目的是忙等待

不完全正确。自旋锁的目的是作为快速、易于实现的锁原语 - 但它仍然需要正确编写,考虑到某些可能的情况。例如,英特尔公司表示(关于Boost在lock()内部使用_mm_pause()作为产生让出的方法):

在自旋等待循环中,pause内嵌函数提高了代码检测锁释放的速度,并提供了特别显著的性能增益。

因此,像void lock() { while(m_flag.test_and_set(std::memory_order_acquire)); }这样的实现可能并不像看起来那么好。


4
这不是自旋锁。自旋锁的目的是忙等待,明确不会让出CPU时间片。 - Kaiserludi
你应该使用预先存在的 std::atomic_flag 类来实现这个。这是一个“正确”的自旋锁实现方式 - bit2shift
@Kaiserludi 这可能是真的,也可能不是。我更新了答案以回应您的评论。同样适用于 @bit2shift - 在某些情况下, 的自旋锁实现可能不是“正确”的。例如,Boost在lock()内部使用非常好的自定义yield策略来优化其自旋锁实现的性能。关于std::atomic_lock - 我已经更新了代码。它确实是唯一保证无锁的类型,因此在编写自定义自旋锁时是一个自然的选择。 - Mateusz Grzejek
@bit2shift 抱歉,但这不是一个正确的自旋锁实现方式。在需要缓存行独占状态的操作上自旋非常低效。例如,可以在这里讨论:https://en.wikipedia.org/wiki/Spinlock#Significant_optimizations 或者这里:https://rigtorp.se/spinlock/。 - Daniel Langr

1
你的问题还有一个重要的相关问题。高效的自旋锁永远不会在涉及(甚至潜在的)修改内存位置的操作上“自旋”(例如exchangetest_and_set)。在典型的现代架构中,这些操作会生成需要将带有锁定内存位置的缓存行置于独占状态的指令,这是非常耗时的(特别是当多个线程同时自旋时)。始终在仅加载/读取时自旋,并尝试仅在有机会成功执行此操作时获取锁定。

例如,一篇很好的相关文章在这里:在C ++中正确实现自旋锁


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接