比较互斥锁和自旋锁唤醒时间的延迟

3

我对互斥锁和自旋锁的延迟时间很感兴趣,即一个线程解锁后另一个等待线程可以访问该锁的时间。我用C++编写了两个测试,发现互斥锁需要大约15000纳秒,而自旋锁只需要大约300纳秒。由于我刚开始学习多线程同步,因此我想先验证这些测试是否正确实现并且结果是否合理。

我还想知道为什么自旋锁的延迟仍然约为1000个时钟周期,即使原子操作本身很快(约100个时钟周期),旋转的CPU应该能够在解锁发生后立即捕捉到。有没有任何方法可以加速这个过程?

结果相当一致,并且我检查了时间测量本身的延迟<60ns。

互斥锁测试

// contended_mutex_overhead.cpp

#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>

uint64_t rdtsc(){
    unsigned int lo,hi;
    __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi));
    return ((uint64_t)hi << 32) | lo;
}

// Global clock
auto t = std::chrono::high_resolution_clock::now();
uint64_t cycles_count;

// Global mutex lock
std::mutex m;

int slowthread() {
  m.lock();

  // Do work that takes some time, so that main thread is waiting on lock
  int sum = 0;
  for (int i=0; i<1000000000; ++i) {
    sum += i;
  }

  // Mark start time, right before lock is returned
  t = std::chrono::high_resolution_clock::now();
  cycles_count = rdtsc();
  m.unlock();
  return sum;
}

int main() {
  std::thread thr(slowthread);

  // Delay the main() thread with a std::cout call to make sure slowthread() gets a chance
  // to grab the lock before main() requests it in the next line.
  std::cout << "hi\n";

  // The lock will not be available at this time, until slowthread's blocking work completes
  m.lock();

  // Measure the end time, and calculate the delta from slowthread's release of the lock to when lock is given to main()
  auto finish = std::chrono::high_resolution_clock::now();

  auto cycles2 = rdtsc();
  auto d = std::chrono::duration_cast<std::chrono::nanoseconds>(finish-t).count();
  std::cout << d << " nanosecond overhead for contended mutex wake\n";
  std::cout << cycles2 - cycles_count << " cycle overhead for contended mutex wake\n";
  thr.join();
}

输出:

> g++ -O3 -pthread contended_mutex_overhead.cpp; ./a.out
hi
14763 nanosecond overhead for contended mutex wake
42786 cycle overhead for contended mutex wake

自旋锁测试

// contended_spinlock_overhead.cpp
#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>
#include <atomic>

uint64_t rdtsc(){
    unsigned int lo,hi;
    __asm__ __volatile__ ("rdtsc" : "=a" (lo), "=d" (hi));
    return ((uint64_t)hi << 32) | lo;
}

struct spinlock {
  std::atomic<bool> lock_ = {false};

  void lock() {
    while(lock_.exchange(true));
  }

  void unlock() { lock_.store(false); }
};

// Global clock
auto t = std::chrono::high_resolution_clock::now();
uint64_t cycles_count;

// Global spinlock
spinlock spinner;

int slowthread() {
  spinner.lock();

  // Do work that takes some time, so that main thread is waiting on lock
  int sum = 0;
  for (int i=0; i<1000000000; ++i) {
    sum += i;
  }

  // Mark start time, right before lock is returned
  t = std::chrono::high_resolution_clock::now();
  cycles_count = rdtsc();
  spinner.unlock();
  return sum;
}

int main() {
  std::thread thr(slowthread);

  // Delay the main() thread with a std::cout call to make sure slowthread() gets a chance
  // to grab the lock before main() requests it.
  std::cout << "hi\n";

  // The lock will not be available at this time, until slowthread's blocking work completes
  spinner.lock();

  // Measure the end time, and calculate the delta from slowthread's release of the lock to when lock is given to main()
  auto finish = std::chrono::high_resolution_clock::now();

  auto cycles2 = rdtsc();
  auto d = std::chrono::duration_cast<std::chrono::nanoseconds>(finish-t).count();
  std::cout << d << " nanosecond overhead for spinlock wake\n";
  std::cout << cycles2 - cycles_count << " cycle overhead for spinlock wake\n";
  thr.join();
}

输出:

hi
363 nanosecond overhead for spinlock wake
1148 cycle overhead for spinlock wake

1
令人印象深刻的是,您可以从 high_resolution_clock 中获得如此高的分辨率。我习惯于它在微秒级别停滞不前。顺便提醒一下,high_resolution_clock 不保证单调。有些实现会向前或向后跳跃时间。 - user4581301
在开始时,**// 直到slowthread的阻塞工作完成,锁才会可用** 为false。 - 273K
1
值得注意的是,Linux不是实时操作系统。有内核选项可以改善延迟,但即使是旋转线程也有可能被抢占。一个例子是定时器中断,根据你的内核,每秒触发250或1000次,可能会导致不可预测的延迟峰值。CPU频率缩放也可能导致延迟峰值。 - Alan Birtles
请注意,您的“需要一些时间的工作”很可能会被编译器优化掉。 - Alan Birtles
1
这些数字与我几年前在消息队列中测量的相似。使用互斥锁和条件变量,消息的延迟平均为15-30微秒(一些峰值要高得多),而使用自旋锁则接近于0。我无法将延迟降低到以下水平。 - Federico
显示剩余2条评论
1个回答

3
Spinlock和mutex在正确的测量下比你想象的要更加高效。在我的旧2GHz Windows笔记本上,以下代码显示mutex为75ns,spinlock为12.5ns。而在现代3GHz GodBolt在线Linux服务器上,mutex平均为15ns,spinlock为8ns。请参见代码后面的更多详细信息,其中包括测量的控制台输出。
GodBolt Linux mutex更快,因为在Linux中实现的方式比在Windows中更加高效。在Windows上,您可以尝试使用std::shared_mutex而不是std::mutex,因为shared是在Windows的更高版本中实现的,并且通过更高效的算法和API进行了优化,mutex的共享版本在我的笔记本电脑上显示为50ns,而常规mutex为75ns。
我还测量了控制台输出中显示的周期和GHz。请注意,由于循环计数是通过RDTSC指令测量的,因此这些周期计数对应于基础频率,这意味着如果当前CPU存在热限制或Turbo Boost,即从基础速度改变了其速度,则将错误地显示“cycles”的数量。只有纳秒(ns)的数量始终正确显示,对应于当前CPU速度。
如何正确测量这里的所有内容。首先,只需测量spinlock和mutex本身的时间,而不需要任何额外的工作,您需要仅测量单个线程。
但是,要测量仅单个线程,您需要向编译器证明您将在其他线程中使用mutex和spinlock。为此,我通过std::async创建了第二个虚拟线程,并将其未来保持在易失性变量中。该虚拟线程会防止编译器优化掉mutex和spinlock代码,因为它看到它同时在另一个线程中使用。如果没有虚拟线程,则mutex和spinlock代码可能会从单个(主)线程中删除。
第二个虚拟线程实际上什么也不做,只锁定一次mutex和spinlock,然后退出。因此,这个第二个线程不会干扰我们的测量。
还有一个重要的事情是关于std::atomic_flag - 您必须使用更轻松的内存顺序,特别是在我的代码中,您可以看到我在spinlock的锁定行中使用std::memory_order_acquire,在释放行中使用std::memory_order_release。这种更紧密的内存顺序将有助于从spinlock中获得更快的速度,通过消除不必要的操作。默认情况下,使用更慢的顺序内存顺序std::memory_order_seq_cst。
下一个非常重要的事情,可能是您代码中错误结果的主要原因——您应该进行许多次测量并选择最小值。为什么选择最小值?因为如果您测量1秒钟,那么在此期间操作系统(Windows / Linux / MacOS)将多次旋转(重新调度)所有线程,每10-15毫秒左右旋转一次。这将导致每个线程唤醒之间的非常巨大的暂停,并破坏所有结果。
为避免在线程重新调度时出现的计时破坏,我们做两件事——首先,我们测量相当少量的时间,约为50-100微秒,其次——我们这样做很多次(10-15次)。对于任何操作系统,我们可以相当确保至少一次50微秒的测量是正确的,即不包括线程的重新调度。
100微秒的时间对于std :: chrono :: high_resolution_clock的精度来说是相当好的,它通常具有约200-500纳秒的分辨率。
从15次测量中取最小时间将帮助我们避免其他硬件延迟。因为最小时间将显示出最高效的结果,几乎这个结果通常会在实际系统中稍后平均发生。
我还使用RDTSC来测量周期,这仅是为了在控制台中显示纳秒ns之外的内容,以显示CPU的周期和GHz。
速度的微小提高是通过在主计量循环内两次重复锁定/解锁互斥锁和自旋锁来实现的。这是一个小型手动循环展开,以更精确并减少循环变量增量和比较。
在GodBolt上在线尝试!
#include <cstdint>
#include <atomic>
#include <mutex>
#include <future>
#include <iostream>
#include <iomanip>

#include <immintrin.h>

int main() {
    auto Rdtsc = []() -> uint64_t { return __rdtsc(); };
    auto Time = []() -> uint64_t {
        static auto const gtb = std::chrono::high_resolution_clock::now();
        return std::chrono::duration_cast<std::chrono::nanoseconds>(
            std::chrono::high_resolution_clock::now() - gtb).count();
    };
    std::cout << std::fixed << std::setprecision(2);
    size_t constexpr nloops = 1 << 4, ntests = 1 << 13;
    {
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::atomic_flag f = ATOMIC_FLAG_INIT;
        auto volatile ares = std::async(std::launch::async, [&]{ while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); });
        for (size_t j = 0; j < nloops; ++j) {
            auto tim = Time(), timr = Rdtsc();
            for (size_t i = 0; i < ntests / 2; ++i) {
                { while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); }
                { while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); }
            }
            tim = Time() - tim;
            timr = Rdtsc() - timr;
            if (tim < min_time) {
                min_time = tim;
                min_rdtsc = timr;
            }
        }
        std::cout << "Spinlock time: " << double(min_time) / ntests << " ns (" << double(min_rdtsc) / ntests
            << " cycles, " << double(min_rdtsc) / min_time << " GHz)" << std::endl;
    }
    {
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::mutex mux;
        auto volatile ares = std::async(std::launch::async, [&]{ std::lock_guard<std::mutex> lock(mux); });
        for (size_t j = 0; j < nloops; ++j) {
            auto tim = Time(), timr = Rdtsc();
            for (size_t i = 0; i < ntests / 2; ++i) {
                { std::lock_guard<std::mutex> lock(mux); }
                { std::lock_guard<std::mutex> lock(mux); }
            }
            tim = Time() - tim;
            timr = Rdtsc() - timr;
            if (tim < min_time) {
                min_time = tim;
                min_rdtsc = timr;
            }
        }
        std::cout << "Mutex time   : " << double(min_time) / ntests << " ns (" << double(min_rdtsc) / ntests
            << " cycles, " << double(min_rdtsc) / min_time << " GHz)" << std::endl;
    }
}

现代3GHz Linux GodBolt在线服务器的输出结果:

Spinlock time: 7.65 ns (22.95 cycles, 3.00 GHz)
Mutex time   : 15.59 ns (46.78 cycles, 3.00 GHz)

旧款2GHz Windows笔记本的输出:

Spinlock time: 12.38 ns (25.94 cycles, 2.10 GHz)
Mutex time   : 74.27 ns (155.58 cycles, 2.09 GHz)

(std::shared_mutex的时间为PC超过了50 ns)


通过运行多个线程来测量在多个线程竞争同一个原子标志或互斥锁获取锁时计时的情况,程序可以变得更加复杂。

以下程序测试单线程和多线程定时。此外,还对计数器进行递增作为一种类似自旋锁/互斥锁保护的工作。

为了获得精确的结果,程序通过将不同的线程分配到不同的核心来设置线程的亲和性。

多线程版本使用std::barrier在相同的精确时间点上进行测量的开始/停止。此外,它使用原子计数器来同步每个线程的速度,使它们以相同的速率前进。

多线程版本的定时应该几乎等于单线程版本,特别是在现代CPU上,这两者的时间应该非常接近。如果所选的(由亲和性选择)核心受操作系统占用太多,则多线程的定时可能会大得多。如果某些操作系统以某种奇怪的顺序重新安排线程,则定时可能会更长。我遇到过Windows多线程定时很慢,而Linux定时完全没有问题的情况。

请查看代码后的计时。请注意,GodBolt服务器的时间非常短。自旋锁(单线程和多线程)是8 ns23个周期),这对应于原子标志的主要指令XCHG的定时(此处17个周期)。互斥锁是16 ns50个周期),几乎与自旋锁相同,因为在Linux中它是使用原子标志实现的,额外的开销是由于syscall操作或CALL指令造成的。

在线测试!

#include <cstdint>
#include <atomic>
#include <mutex>
#include <future>
#include <iostream>
#include <iomanip>
#include <barrier>
#include <optional>
#include <thread>
#include <vector>

#include <immintrin.h>

#if defined(_MSC_VER) && !defined(__clang__)
    #define FINL [[msvc::forceinline]]
#else
    #define FINL __attribute__((always_inline))
#endif

#ifdef _WIN32
    #include <windows.h>
    // https://dev59.com/yrTma4cB1Zd3GeqP5Fgk#56486809
    inline void SetAffinity(std::thread & thr, size_t i) {
        if (SetThreadAffinityMask(thr.native_handle(), DWORD_PTR(1) << i) == 0)
            std::cout << "SetThreadAffinityMask failed, GLE = " << GetLastError() << std::endl;
    }
#else
    #include <pthread.h>
    // https://dev59.com/22Af5IYBdhLWcg3wcyWr#57620568
    inline void SetAffinity(std::thread & thr, size_t i) {
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(i, &cpuset);
        int rc = pthread_setaffinity_np(thr.native_handle(), sizeof(cpu_set_t), &cpuset);
        if (rc != 0)
            std::cout << "Error calling pthread_setaffinity_np: " << rc << std::endl;
    }
#endif

int main() {
    auto Rdtsc = []() -> uint64_t { return __rdtsc(); };
    auto Time = []() -> uint64_t {
        static auto const gtb = std::chrono::high_resolution_clock::now();
        return std::chrono::duration_cast<std::chrono::nanoseconds>(
            std::chrono::high_resolution_clock::now() - gtb).count();
    };
    std::cout << std::fixed << std::setprecision(2);
    
    auto RunST = [&Rdtsc, &Time](auto const & ReLock, auto nloops, auto ntests, auto & min_time, auto & min_rdtsc){
        { auto volatile ares = std::async(std::launch::async, [&]{ ReLock(); }); }
        for (size_t j = 0; j < nloops; ++j) {
            auto timr = Rdtsc(), tim = Time();
            for (size_t i = 0; i < ntests / 4; ++i) {
                ReLock(); ReLock(); ReLock(); ReLock();
            }
            tim = Time() - tim;
            timr = Rdtsc() - timr;
            if (tim < min_time) {
                min_time = tim;
                min_rdtsc = timr;
            }
        }
    };
    
    auto RunMT = [&Rdtsc, &Time](auto nthr, auto ithr, auto const & ReLock, auto nloops, auto ntests, auto & barrier, auto & timings, auto & min_time, auto & min_rdtsc, auto & brs){
        for (size_t j = 0; j < nloops; ++j) {
            barrier.arrive_and_wait();
            size_t nibr = (ithr + 1) % nthr;
            uint32_t ibrc = 0;
            auto ReSync = [&]{
                ibrc += 16;
                brs[ithr].store(ibrc, std::memory_order_relaxed);
                while (ibrc > brs[nibr].load(std::memory_order_relaxed)) {}
            };
            ReSync();
            auto timr = Rdtsc(), tim = Time();
            for (size_t i = 0; i < ntests / 16; ++i) {
                ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock();
                ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock();
                ReSync();
            }
            tim = Time() - tim;
            timr = Rdtsc() - timr;
            timings[ithr] = {tim, timr};
            barrier.arrive_and_wait();
            if (ithr != 0)
                continue;
            
            std::sort(timings.begin(), timings.end());
            if (timings.back().first >= timings.front().first * 1.3)
                continue;
            
            tim = 0; timr = 0;
            for (size_t i = 0; i < timings.size(); ++i) {
                tim += timings[i].first;
                timr += timings[i].second;
            }
            tim /= timings.size();
            timr /= timings.size();
            if (tim < min_time) {
                min_time = tim;
                min_rdtsc = timr;
            }
        }
    };

    auto const hard_threads = std::thread::hardware_concurrency();
    auto const test_threads = 2; //std::max<size_t>(2, hard_threads / 2);
    std::cout << test_threads << " threads." << std::endl;
    
    {
        uint64_t vcnt = 0;
        size_t constexpr ntries = 1 << 4, nloops = 1 << 8, ntests = 1 << 11;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::atomic_flag f = ATOMIC_FLAG_INIT;
        auto const ReLock = [&f, &vcnt]() FINL {
            while (f.test_and_set(std::memory_order_acquire)) {}
            ++vcnt;
            f.clear(std::memory_order_release);
        };
        auto const stim = Time();
        for (size_t itry = 0; itry < ntries; ++itry)
            RunST(ReLock, nloops, ntests, min_time, min_rdtsc);
        std::cout << "Single-Threaded Spinlock time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
            << " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
            << std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
            << std::setprecision(8) << double(vcnt) / (ntries * (nloops * ntests + 1)) << std::endl;
    }
    
    {
        uint64_t vcnt = 0;
        size_t constexpr ntries = 1 << 4, nloops = 1 << 6, ntests = 1 << 11;
        size_t const nthr = test_threads;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::atomic_flag f = ATOMIC_FLAG_INIT;
        std::barrier barrier(nthr, []() noexcept {});
        std::vector<std::pair<uint64_t, uint64_t>> timings(nthr);
        std::vector<std::optional<std::thread>> threads(nthr);
        auto const ReLock = [&f, &vcnt]() FINL {
            while (f.test_and_set(std::memory_order_acquire)) {}
            ++vcnt;
            f.clear(std::memory_order_release);
        };
        auto const stim = Time();
        for (size_t itry = 0; itry < ntries; ++itry) {
            std::vector<std::atomic<uint32_t>> brs(nthr);
            for (size_t ithr = 0; ithr < threads.size(); ++ithr) {
                threads[ithr] = std::thread([&, ithr]{ RunMT(nthr, ithr, ReLock, nloops, ntests, barrier, timings, min_time, min_rdtsc, brs); });
                SetAffinity(*threads[ithr], nthr * 2 <= hard_threads ? ithr * 2 : ithr);
            }
            for (auto & t: threads)
                t->join();
            threads.clear();
            threads.resize(nthr);
        }
        std::cout << "Multi -Threaded Spinlock time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
            << " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
            << std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
            << std::setprecision(8) << double(vcnt) / (ntries * nthr * nloops * ntests) << std::endl;
    }
    
    {
        uint64_t vcnt = 0;
        size_t constexpr ntries = 1 << 4, nloops = 1 << 8, ntests = 1 << 11;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::mutex mux;
        auto const ReLock = [&mux, &vcnt]() FINL {
            std::lock_guard<std::mutex> lock(mux);
            ++vcnt;
        };
        auto const stim = Time();
        for (size_t itry = 0; itry < ntries; ++itry)
            RunST(ReLock, nloops, ntests, min_time, min_rdtsc);
        std::cout << "Single-Threaded Mutex    time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
            << " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
            << std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
            << std::setprecision(8) << double(vcnt) / (ntries * (nloops * ntests + 1)) << std::endl;
    }
    
    {
        uint64_t vcnt = 0;
        size_t constexpr ntries = 1 << 4, nloops = 1 << 6, ntests = 1 << 11;
        size_t const nthr = test_threads;
        uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
        std::mutex mux;
        std::barrier barrier(nthr, []() noexcept {});
        std::vector<std::pair<uint64_t, uint64_t>> timings(nthr);
        std::vector<std::optional<std::thread>> threads(nthr);
        auto const ReLock = [&mux, &vcnt]() FINL {
            std::lock_guard<std::mutex> lock(mux);
            ++vcnt;
        };
        auto const stim = Time();
        for (size_t itry = 0; itry < ntries; ++itry) {
            std::vector<std::atomic<uint32_t>> brs(nthr);
            for (size_t ithr = 0; ithr < threads.size(); ++ithr) {
                threads[ithr] = std::thread([&, ithr]{ RunMT(nthr, ithr, ReLock, nloops, ntests, barrier, timings, min_time, min_rdtsc, brs); });
                SetAffinity(*threads[ithr], nthr * 2 <= hard_threads ? ithr * 2 : ithr);
            }
            for (auto & t: threads)
                t->join();
            threads.clear();
            threads.resize(nthr);
        }
        std::cout << "Multi -Threaded Mutex    time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
            << " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
            << std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
            << std::setprecision(8) << double(vcnt) / (ntries * nthr * nloops * ntests) << std::endl;
    }
}

在 GodBolt Linux 服务器上的输出:

2 threads.
Single-Threaded Spinlock time:   8.63 ns ( 25.91 cycles, 3.00 GHz), test time 0.31 sec, precision 1.00000000
Multi -Threaded Spinlock time:   8.51 ns ( 25.58 cycles, 3.00 GHz), test time 1.04 sec, precision 1.00000000
Single-Threaded Mutex    time:  15.89 ns ( 47.66 cycles, 3.00 GHz), test time 0.82 sec, precision 1.00000000
Multi -Threaded Mutex    time:  17.94 ns ( 53.86 cycles, 3.00 GHz), test time 4.18 sec, precision 1.00000000

旧的Windows笔记本电脑上的输出:

2 threads.
Single-Threaded Spinlock time:  31.40 ns ( 67.22 cycles, 2.14 GHz), test time 0.48 sec, precision 1.00000000
Multi -Threaded Spinlock time:  32.45 ns ( 70.21 cycles, 2.16 GHz), test time 0.98 sec, precision 1.00000000
Single-Threaded Mutex    time:  79.69 ns (168.36 cycles, 2.11 GHz), test time 0.94 sec, precision 1.00000000
Multi -Threaded Mutex    time:  83.67 ns (269.23 cycles, 3.22 GHz), test time 0.91 sec, precision 1.00000000

@rampatowl 实际上,在所有英特尔 CPU 型号上,它的时间完全相同。无论其他线程是否为同一锁定而奋斗,在 Intel CPU 上,这两种情况都会给出完全相同的时间,或者至少几乎相同。对于自旋锁,Intel 使用 XCHG 指令,这是主要的时间消耗,因为它锁定和解锁缓存行,另一个线程只需等待缓存行锁定被释放,然后通过进行相同的缓存行锁定来获得第二个锁定的所有权。如果有任何时间差异,那么不是很大。无论如何,如果您有疑问,我也可以进行多线程测量。 - Arty
有趣的是,我认为无争用锁和有争用锁在延迟方面会有很大的差别。特别是对于互斥锁,如果锁不可用,线程将被置于睡眠状态,并且在唤醒时会进行上下文切换。如果您能够重现有争用情况下<100ns的结果(即每个lock()案例都必须等待其他线程完成工作),那将非常有趣!一些微妙之处涉及到锁定()调用的排序以及阻塞线程中的工作,以确保我们实际上正在测量有争用的锁。 - rampatowl
@rampatowl 继续上一条评论。这种竞争条件肯定会导致每个线程等待其他线程释放锁的时间很长。因此,这准确地测量了重度使用自旋锁/互斥锁重新锁定的情况。基本上,主要的时间消耗是重新锁定本身,所有其他东西都需要不可见的时间。在所有英特尔CPU上,自旋锁(和Linux中的互斥锁)都是通过XCHG指令实现的,该指令需要17-20个周期,因此您可能会看到我的程序显示大约25个周期,因为它还有一些其他快速指令。 - Arty
@rampatowl 继续前面两个评论。请注意,在您的家用电脑上,多线程版本的时间可能会显示得更长,这是因为 CPU 核心之一被占用,或者线程在同一核心上重新调度。但是请参见 Try it online!(代码之前的链接)计时,它显示 GodBolt 服务器对自旋锁需要 25 个周期的时间,对互斥锁需要 50 个周期的时间。我的家用笔记本电脑在五次运行中有四次显示出较长的多线程时间,并且大约有 20% 的运行时间几乎等于单线程版本的时间。 - Arty
非常感兴趣您的测试(两个都是),我不能评论锁争用必须考虑,特别是当涉及到同步的线程驻留在不同的核心或更糟的是在不同的CPU上时,必须考虑它。当锁定涉及某种缓存崩溃时,情况应该在这两种情况下发生显着变化-例如,您可以使用pthread_setaffinity_np()将不同的线程放置在不同的CPU上进行实验。 - Sigi
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接