





// contended_mutex_overhead.cpp

#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>

uint64_t rdtsc(){
    unsigned int lo,hi;
    __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi));
    return ((uint64_t)hi << 32) | lo;

// Global clock
auto t = std::chrono::high_resolution_clock::now();
uint64_t cycles_count;

// Global mutex lock
std::mutex m;

int slowthread() {

  // Do work that takes some time, so that main thread is waiting on lock
  int sum = 0;
  for (int i=0; i<1000000000; ++i) {
    sum += i;

  // Mark start time, right before lock is returned
  t = std::chrono::high_resolution_clock::now();
  cycles_count = rdtsc();
  return sum;

int main() {
  std::thread thr(slowthread);

  // Delay the main() thread with a std::cout call to make sure slowthread() gets a chance
  // to grab the lock before main() requests it in the next line.
  std::cout << "hi\n";

  // The lock will not be available at this time, until slowthread's blocking work completes

  // Measure the end time, and calculate the delta from slowthread's release of the lock to when lock is given to main()
  auto finish = std::chrono::high_resolution_clock::now();

  auto cycles2 = rdtsc();
  auto d = std::chrono::duration_cast<std::chrono::nanoseconds>(finish-t).count();
  std::cout << d << " nanosecond overhead for contended mutex wake\n";
  std::cout << cycles2 - cycles_count << " cycle overhead for contended mutex wake\n";


> g++ -O3 -pthread contended_mutex_overhead.cpp; ./a.out
14763 nanosecond overhead for contended mutex wake
42786 cycle overhead for contended mutex wake


// contended_spinlock_overhead.cpp
#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>
#include <atomic>

uint64_t rdtsc(){
    unsigned int lo,hi;
    __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi));
    return ((uint64_t)hi << 32) | lo;

struct spinlock {
  std::atomic<bool> lock_ = {false};

  void lock() {

  void unlock() { lock_.store(false); }

// Global clock
auto t = std::chrono::high_resolution_clock::now();
uint64_t cycles_count;

// Global spinlock
spinlock spinner;

int slowthread() {

  // Do work that takes some time, so that main thread is waiting on lock
  int sum = 0;
  for (int i=0; i<1000000000; ++i) {
    sum += i;

  // Mark start time, right before lock is returned
  t = std::chrono::high_resolution_clock::now();
  cycles_count = rdtsc();
  return sum;

int main() {
  std::thread thr(slowthread);

  // Delay the main() thread with a std::cout call to make sure slowthread() gets a chance
  // to grab the lock before main() requests it.
  std::cout << "hi\n";

  // The lock will not be available at this time, until slowthread's blocking work completes

  // Measure the end time, and calculate the delta from slowthread's release of the lock to when lock is given to main()
  auto finish = std::chrono::high_resolution_clock::now();

  auto cycles2 = rdtsc();
  auto d = std::chrono::duration_cast<std::chrono::nanoseconds>(finish-t).count();
  std::cout << d << " nanosecond overhead for spinlock wake\n";
  std::cout << cycles2 - cycles_count << " cycle overhead for spinlock wake\n";


363 nanosecond overhead for spinlock wake
1148 cycle overhead for spinlock wake

令人印象深刻的是,您可以从 high_resolution_clock 中获得如此高的分辨率。我习惯于它在微秒级别停滞不前。顺便提醒一下,high_resolution_clock 不保证单调。有些实现会向前或向后跳跃时间。 - user4581301
在开始时,**// 直到slowthread的阻塞工作完成,锁才会可用** 为false。 - 273K
值得注意的是,Linux不是实时操作系统。有内核选项可以改善延迟,但即使是旋转线程也有可能被抢占。一个例子是定时器中断,根据你的内核,每秒触发250或1000次,可能会导致不可预测的延迟峰值。CPU频率缩放也可能导致延迟峰值。 - Alan Birtles
请注意,您的“需要一些时间的工作”很可能会被编译器优化掉。 - Alan Birtles
这些数字与我几年前在消息队列中测量的相似。使用互斥锁和条件变量,消息的延迟平均为15-30微秒(一些峰值要高得多),而使用自旋锁则接近于0。我无法将延迟降低到以下水平。 - Federico

Spinlock和mutex在正确的测量下比你想象的要更加高效。在我的旧2GHz Windows笔记本上,以下代码显示mutex为75ns,spinlock为12.5ns。而在现代3GHz GodBolt在线Linux服务器上,mutex平均为15ns,spinlock为8ns。请参见代码后面的更多详细信息,其中包括测量的控制台输出。
GodBolt Linux mutex更快,因为在Linux中实现的方式比在Windows中更加高效。在Windows上,您可以尝试使用std::shared_mutex而不是std::mutex,因为shared是在Windows的更高版本中实现的,并且通过更高效的算法和API进行了优化,mutex的共享版本在我的笔记本电脑上显示为50ns,而常规mutex为75ns。
我还测量了控制台输出中显示的周期和GHz。请注意,由于循环计数是通过RDTSC指令测量的,因此这些周期计数对应于基础频率,这意味着如果当前CPU存在热限制或Turbo Boost,即从基础速度改变了其速度,则将错误地显示“cycles”的数量。只有纳秒(ns)的数量始终正确显示,对应于当前CPU速度。
还有一个重要的事情是关于std::atomic_flag - 您必须使用更轻松的内存顺序,特别是在我的代码中,您可以看到我在spinlock的锁定行中使用std::memory_order_acquire,在释放行中使用std::memory_order_release。这种更紧密的内存顺序将有助于从spinlock中获得更快的速度,通过消除不必要的操作。默认情况下,使用更慢的顺序内存顺序std::memory_order_seq_cst。
下一个非常重要的事情,可能是您代码中错误结果的主要原因——您应该进行许多次测量并选择最小值。为什么选择最小值?因为如果您测量1秒钟,那么在此期间操作系统(Windows / Linux / MacOS)将多次旋转(重新调度)所有线程,每10-15毫秒左右旋转一次。这将导致每个线程唤醒之间的非常巨大的暂停,并破坏所有结果。
100微秒的时间对于std :: chrono :: high_resolution_clock的精度来说是相当好的,它通常具有约200-500纳秒的分辨率。
#include <cstdint>
#include <atomic>
#include <mutex>
#include <future>
#include <iostream>
#include <iomanip>

#include <immintrin.h>

int main() {
    auto Rdtsc = []() -> uint64_t { return __rdtsc(); };
    auto Time = []() -> uint64_t {
        static auto const gtb = std::chrono::high_resolution_clock::now();
        return std::chrono::duration_cast<std::chrono::nanoseconds>(
            std::chrono::high_resolution_clock::now() - gtb).count();
    std::cout << std::fixed << std::setprecision(2);
    size_t constexpr nloops = 1 << 4, ntests = 1 << 13;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::atomic_flag f = ATOMIC_FLAG_INIT;
        auto volatile ares = std::async(std::launch::async, [&]{ while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); });
        for (size_t j = 0; j < nloops; ++j) {
            auto tim = Time(), timr = Rdtsc();
            for (size_t i = 0; i < ntests / 2; ++i) {
                { while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); }
                { while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); }
            tim = Time() - tim;
            timr = Rdtsc() - timr;
            if (tim < min_time) {
                min_time = tim;
                min_rdtsc = timr;
        std::cout << "Spinlock time: " << double(min_time) / ntests << " ns (" << double(min_rdtsc) / ntests
            << " cycles, " << double(min_rdtsc) / min_time << " GHz)" << std::endl;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::mutex mux;
        auto volatile ares = std::async(std::launch::async, [&]{ std::lock_guard<std::mutex> lock(mux); });
        for (size_t j = 0; j < nloops; ++j) {
            auto tim = Time(), timr = Rdtsc();
            for (size_t i = 0; i < ntests / 2; ++i) {
                { std::lock_guard<std::mutex> lock(mux); }
                { std::lock_guard<std::mutex> lock(mux); }
            tim = Time() - tim;
            timr = Rdtsc() - timr;
            if (tim < min_time) {
                min_time = tim;
                min_rdtsc = timr;
        std::cout << "Mutex time   : " << double(min_time) / ntests << " ns (" << double(min_rdtsc) / ntests
            << " cycles, " << double(min_rdtsc) / min_time << " GHz)" << std::endl;

现代3GHz Linux GodBolt在线服务器的输出结果:

Spinlock time: 7.65 ns (22.95 cycles, 3.00 GHz)
Mutex time   : 15.59 ns (46.78 cycles, 3.00 GHz)

旧款2GHz Windows笔记本的输出:

Spinlock time: 12.38 ns (25.94 cycles, 2.10 GHz)
Mutex time   : 74.27 ns (155.58 cycles, 2.09 GHz)

(std::shared_mutex的时间为PC超过了50 ns)






请查看代码后的计时。请注意,GodBolt服务器的时间非常短。自旋锁(单线程和多线程)是8 ns23个周期),这对应于原子标志的主要指令XCHG的定时(此处17个周期)。互斥锁是16 ns50个周期),几乎与自旋锁相同,因为在Linux中它是使用原子标志实现的,额外的开销是由于syscall操作或CALL指令造成的。


#include <cstdint>
#include <atomic>
#include <mutex>
#include <future>
#include <iostream>
#include <iomanip>
#include <barrier>
#include <optional>
#include <thread>
#include <vector>

#include <immintrin.h>

#if defined(_MSC_VER) && !defined(__clang__)
    #define FINL [[msvc::forceinline]]
    #define FINL __attribute__((always_inline))

#ifdef _WIN32
    #include <windows.h>
    // https://dev59.com/yrTma4cB1Zd3GeqP5Fgk#56486809
    inline void SetAffinity(std::thread & thr, size_t i) {
        if (SetThreadAffinityMask(thr.native_handle(), DWORD_PTR(1) << i) == 0)
            std::cout << "SetThreadAffinityMask failed, GLE = " << GetLastError() << std::endl;
    #include <pthread.h>
    // https://dev59.com/22Af5IYBdhLWcg3wcyWr#57620568
    inline void SetAffinity(std::thread & thr, size_t i) {
        cpu_set_t cpuset;
        CPU_SET(i, &cpuset);
        int rc = pthread_setaffinity_np(thr.native_handle(), sizeof(cpu_set_t), &cpuset);
        if (rc != 0)
            std::cout << "Error calling pthread_setaffinity_np: " << rc << std::endl;

int main() {
    auto Rdtsc = []() -> uint64_t { return __rdtsc(); };
    auto Time = []() -> uint64_t {
        static auto const gtb = std::chrono::high_resolution_clock::now();
        return std::chrono::duration_cast<std::chrono::nanoseconds>(
            std::chrono::high_resolution_clock::now() - gtb).count();
    std::cout << std::fixed << std::setprecision(2);
    auto RunST = [&Rdtsc, &Time](auto const & ReLock, auto nloops, auto ntests, auto & min_time, auto & min_rdtsc){
        { auto volatile ares = std::async(std::launch::async, [&]{ ReLock(); }); }
        for (size_t j = 0; j < nloops; ++j) {
            auto timr = Rdtsc(), tim = Time();
            for (size_t i = 0; i < ntests / 4; ++i) {
                ReLock(); ReLock(); ReLock(); ReLock();
            tim = Time() - tim;
            timr = Rdtsc() - timr;
            if (tim < min_time) {
                min_time = tim;
                min_rdtsc = timr;
    auto RunMT = [&Rdtsc, &Time](auto nthr, auto ithr, auto const & ReLock, auto nloops, auto ntests, auto & barrier, auto & timings, auto & min_time, auto & min_rdtsc, auto & brs){
        for (size_t j = 0; j < nloops; ++j) {
            size_t nibr = (ithr + 1) % nthr;
            uint32_t ibrc = 0;
            auto ReSync = [&]{
                ibrc += 16;
                brs[ithr].store(ibrc, std::memory_order_relaxed);
                while (ibrc > brs[nibr].load(std::memory_order_relaxed)) {}
            auto timr = Rdtsc(), tim = Time();
            for (size_t i = 0; i < ntests / 16; ++i) {
                ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock();
                ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock();
            tim = Time() - tim;
            timr = Rdtsc() - timr;
            timings[ithr] = {tim, timr};
            if (ithr != 0)
            std::sort(timings.begin(), timings.end());
            if (timings.back().first >= timings.front().first * 1.3)
            tim = 0; timr = 0;
            for (size_t i = 0; i < timings.size(); ++i) {
                tim += timings[i].first;
                timr += timings[i].second;
            tim /= timings.size();
            timr /= timings.size();
            if (tim < min_time) {
                min_time = tim;
                min_rdtsc = timr;

    auto const hard_threads = std::thread::hardware_concurrency();
    auto const test_threads = 2; //std::max<size_t>(2, hard_threads / 2);
    std::cout << test_threads << " threads." << std::endl;
        uint64_t vcnt = 0;
        size_t constexpr ntries = 1 << 4, nloops = 1 << 8, ntests = 1 << 11;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::atomic_flag f = ATOMIC_FLAG_INIT;
        auto const ReLock = [&f, &vcnt]() FINL {
            while (f.test_and_set(std::memory_order_acquire)) {}
        auto const stim = Time();
        for (size_t itry = 0; itry < ntries; ++itry)
            RunST(ReLock, nloops, ntests, min_time, min_rdtsc);
        std::cout << "Single-Threaded Spinlock time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
            << " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
            << std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
            << std::setprecision(8) << double(vcnt) / (ntries * (nloops * ntests + 1)) << std::endl;
        uint64_t vcnt = 0;
        size_t constexpr ntries = 1 << 4, nloops = 1 << 6, ntests = 1 << 11;
        size_t const nthr = test_threads;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::atomic_flag f = ATOMIC_FLAG_INIT;
        std::barrier barrier(nthr, []() noexcept {});
        std::vector<std::pair<uint64_t, uint64_t>> timings(nthr);
        std::vector<std::optional<std::thread>> threads(nthr);
        auto const ReLock = [&f, &vcnt]() FINL {
            while (f.test_and_set(std::memory_order_acquire)) {}
        auto const stim = Time();
        for (size_t itry = 0; itry < ntries; ++itry) {
            std::vector<std::atomic<uint32_t>> brs(nthr);
            for (size_t ithr = 0; ithr < threads.size(); ++ithr) {
                threads[ithr] = std::thread([&, ithr]{ RunMT(nthr, ithr, ReLock, nloops, ntests, barrier, timings, min_time, min_rdtsc, brs); });
                SetAffinity(*threads[ithr], nthr * 2 <= hard_threads ? ithr * 2 : ithr);
            for (auto & t: threads)
        std::cout << "Multi -Threaded Spinlock time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
            << " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
            << std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
            << std::setprecision(8) << double(vcnt) / (ntries * nthr * nloops * ntests) << std::endl;
        uint64_t vcnt = 0;
        size_t constexpr ntries = 1 << 4, nloops = 1 << 8, ntests = 1 << 11;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::mutex mux;
        auto const ReLock = [&mux, &vcnt]() FINL {
            std::lock_guard<std::mutex> lock(mux);
        auto const stim = Time();
        for (size_t itry = 0; itry < ntries; ++itry)
            RunST(ReLock, nloops, ntests, min_time, min_rdtsc);
        std::cout << "Single-Threaded Mutex    time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
            << " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
            << std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
            << std::setprecision(8) << double(vcnt) / (ntries * (nloops * ntests + 1)) << std::endl;
        uint64_t vcnt = 0;
        size_t constexpr ntries = 1 << 4, nloops = 1 << 6, ntests = 1 << 11;
        size_t const nthr = test_threads;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::mutex mux;
        std::barrier barrier(nthr, []() noexcept {});
        std::vector<std::pair<uint64_t, uint64_t>> timings(nthr);
        std::vector<std::optional<std::thread>> threads(nthr);
        auto const ReLock = [&mux, &vcnt]() FINL {
            std::lock_guard<std::mutex> lock(mux);
        auto const stim = Time();
        for (size_t itry = 0; itry < ntries; ++itry) {
            std::vector<std::atomic<uint32_t>> brs(nthr);
            for (size_t ithr = 0; ithr < threads.size(); ++ithr) {
                threads[ithr] = std::thread([&, ithr]{ RunMT(nthr, ithr, ReLock, nloops, ntests, barrier, timings, min_time, min_rdtsc, brs); });
                SetAffinity(*threads[ithr], nthr * 2 <= hard_threads ? ithr * 2 : ithr);
            for (auto & t: threads)
        std::cout << "Multi -Threaded Mutex    time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
            << " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
            << std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
            << std::setprecision(8) << double(vcnt) / (ntries * nthr * nloops * ntests) << std::endl;

在 GodBolt Linux 服务器上的输出:

2 threads.
Single-Threaded Spinlock time:   8.63 ns ( 25.91 cycles, 3.00 GHz), test time 0.31 sec, precision 1.00000000
Multi -Threaded Spinlock time:   8.51 ns ( 25.58 cycles, 3.00 GHz), test time 1.04 sec, precision 1.00000000
Single-Threaded Mutex    time:  15.89 ns ( 47.66 cycles, 3.00 GHz), test time 0.82 sec, precision 1.00000000
Multi -Threaded Mutex    time:  17.94 ns ( 53.86 cycles, 3.00 GHz), test time 4.18 sec, precision 1.00000000


2 threads.
Single-Threaded Spinlock time:  31.40 ns ( 67.22 cycles, 2.14 GHz), test time 0.48 sec, precision 1.00000000
Multi -Threaded Spinlock time:  32.45 ns ( 70.21 cycles, 2.16 GHz), test time 0.98 sec, precision 1.00000000
Single-Threaded Mutex    time:  79.69 ns (168.36 cycles, 2.11 GHz), test time 0.94 sec, precision 1.00000000
Multi -Threaded Mutex    time:  83.67 ns (269.23 cycles, 3.22 GHz), test time 0.91 sec, precision 1.00000000

@rampatowl 实际上,在所有英特尔 CPU 型号上,它的时间完全相同。无论其他线程是否为同一锁定而奋斗,在 Intel CPU 上,这两种情况都会给出完全相同的时间,或者至少几乎相同。对于自旋锁,Intel 使用 XCHG 指令,这是主要的时间消耗,因为它锁定和解锁缓存行,另一个线程只需等待缓存行锁定被释放,然后通过进行相同的缓存行锁定来获得第二个锁定的所有权。如果有任何时间差异,那么不是很大。无论如何,如果您有疑问,我也可以进行多线程测量。 - Arty
有趣的是,我认为无争用锁和有争用锁在延迟方面会有很大的差别。特别是对于互斥锁,如果锁不可用,线程将被置于睡眠状态,并且在唤醒时会进行上下文切换。如果您能够重现有争用情况下<100ns的结果(即每个lock()案例都必须等待其他线程完成工作),那将非常有趣!一些微妙之处涉及到锁定()调用的排序以及阻塞线程中的工作,以确保我们实际上正在测量有争用的锁。 - rampatowl
@rampatowl 继续上一条评论。这种竞争条件肯定会导致每个线程等待其他线程释放锁的时间很长。因此,这准确地测量了重度使用自旋锁/互斥锁重新锁定的情况。基本上,主要的时间消耗是重新锁定本身,所有其他东西都需要不可见的时间。在所有英特尔CPU上,自旋锁(和Linux中的互斥锁)都是通过XCHG指令实现的,该指令需要17-20个周期,因此您可能会看到我的程序显示大约25个周期,因为它还有一些其他快速指令。 - Arty
@rampatowl 继续前面两个评论。请注意,在您的家用电脑上,多线程版本的时间可能会显示得更长,这是因为 CPU 核心之一被占用,或者线程在同一核心上重新调度。但是请参见 Try it online!(代码之前的链接)计时,它显示 GodBolt 服务器对自旋锁需要 25 个周期的时间,对互斥锁需要 50 个周期的时间。我的家用笔记本电脑在五次运行中有四次显示出较长的多线程时间,并且大约有 20% 的运行时间几乎等于单线程版本的时间。 - Arty
非常感兴趣您的测试(两个都是),我不能评论锁争用必须考虑,特别是当涉及到同步的线程驻留在不同的核心或更糟的是在不同的CPU上时,必须考虑它。当锁定涉及某种缓存崩溃时,情况应该在这两种情况下发生显着变化-例如,您可以使用pthread_setaffinity_np()将不同的线程放置在不同的CPU上进行实验。 - Sigi

