Spinlock和mutex在正确的测量下比你想象的要更加高效。在我的旧2GHz Windows笔记本上,以下代码显示mutex为75ns,spinlock为12.5ns。而在现代3GHz GodBolt在线Linux服务器上,mutex平均为15ns,spinlock为8ns。请参见代码后面的更多详细信息,其中包括测量的控制台输出。
GodBolt Linux mutex更快,因为在Linux中实现的方式比在Windows中更加高效。在Windows上,您可以尝试使用std::shared_mutex而不是std::mutex,因为shared是在Windows的更高版本中实现的,并且通过更高效的算法和API进行了优化,mutex的共享版本在我的笔记本电脑上显示为50ns,而常规mutex为75ns。
我还测量了控制台输出中显示的周期和GHz。请注意,由于循环计数是通过RDTSC指令测量的,因此这些周期计数对应于基础频率,这意味着如果当前CPU存在热限制或Turbo Boost,即从基础速度改变了其速度,则将错误地显示“cycles”的数量。只有纳秒(ns)的数量始终正确显示,对应于当前CPU速度。
如何正确测量这里的所有内容。首先,只需测量spinlock和mutex本身的时间,而不需要任何额外的工作,您需要仅测量单个线程。
但是,要测量仅单个线程,您需要向编译器证明您将在其他线程中使用mutex和spinlock。为此,我通过std::async创建了第二个虚拟线程,并将其未来保持在易失性变量中。该虚拟线程会防止编译器优化掉mutex和spinlock代码,因为它看到它同时在另一个线程中使用。如果没有虚拟线程,则mutex和spinlock代码可能会从单个(主)线程中删除。
第二个虚拟线程实际上什么也不做,只锁定一次mutex和spinlock,然后退出。因此,这个第二个线程不会干扰我们的测量。
还有一个重要的事情是关于std::atomic_flag - 您必须使用更轻松的内存顺序,特别是在我的代码中,您可以看到我在spinlock的锁定行中使用std::memory_order_acquire,在释放行中使用std::memory_order_release。这种更紧密的内存顺序将有助于从spinlock中获得更快的速度,通过消除不必要的操作。默认情况下,使用更慢的顺序内存顺序std::memory_order_seq_cst。
下一个非常重要的事情,可能是您代码中错误结果的主要原因——您应该进行许多次测量并选择最小值。为什么选择最小值?因为如果您测量1秒钟,那么在此期间操作系统(Windows / Linux / MacOS)将多次旋转(重新调度)所有线程,每10-15毫秒左右旋转一次。这将导致每个线程唤醒之间的非常巨大的暂停,并破坏所有结果。
为避免在线程重新调度时出现的计时破坏,我们做两件事——首先,我们测量相当少量的时间,约为50-100微秒,其次——我们这样做很多次(10-15次)。对于任何操作系统,我们可以相当确保至少一次50微秒的测量是正确的,即不包括线程的重新调度。
100微秒的时间对于std :: chrono :: high_resolution_clock的精度来说是相当好的,它通常具有约200-500纳秒的分辨率。
从15次测量中取最小时间将帮助我们避免其他硬件延迟。因为最小时间将显示出最高效的结果,几乎这个结果通常会在实际系统中稍后平均发生。
我还使用RDTSC来测量周期,这仅是为了在控制台中显示纳秒ns之外的内容,以显示CPU的周期和GHz。
速度的微小提高是通过在主计量循环内两次重复锁定/解锁互斥锁和自旋锁来实现的。这是一个小型手动循环展开,以更精确并减少循环变量增量和比较。
在GodBolt上在线尝试!
#include <cstdint>
#include <atomic>
#include <mutex>
#include <future>
#include <iostream>
#include <iomanip>
#include <immintrin.h>
int main() {
auto Rdtsc = []() -> uint64_t { return __rdtsc(); };
auto Time = []() -> uint64_t {
static auto const gtb = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now() - gtb).count();
};
std::cout << std::fixed << std::setprecision(2);
size_t constexpr nloops = 1 << 4, ntests = 1 << 13;
{
uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
std::atomic_flag f = ATOMIC_FLAG_INIT;
auto volatile ares = std::async(std::launch::async, [&]{ while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); });
for (size_t j = 0; j < nloops; ++j) {
auto tim = Time(), timr = Rdtsc();
for (size_t i = 0; i < ntests / 2; ++i) {
{ while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); }
{ while (f.test_and_set(std::memory_order_acquire)) {} f.clear(std::memory_order_release); }
}
tim = Time() - tim;
timr = Rdtsc() - timr;
if (tim < min_time) {
min_time = tim;
min_rdtsc = timr;
}
}
std::cout << "Spinlock time: " << double(min_time) / ntests << " ns (" << double(min_rdtsc) / ntests
<< " cycles, " << double(min_rdtsc) / min_time << " GHz)" << std::endl;
}
{
uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
std::mutex mux;
auto volatile ares = std::async(std::launch::async, [&]{ std::lock_guard<std::mutex> lock(mux); });
for (size_t j = 0; j < nloops; ++j) {
auto tim = Time(), timr = Rdtsc();
for (size_t i = 0; i < ntests / 2; ++i) {
{ std::lock_guard<std::mutex> lock(mux); }
{ std::lock_guard<std::mutex> lock(mux); }
}
tim = Time() - tim;
timr = Rdtsc() - timr;
if (tim < min_time) {
min_time = tim;
min_rdtsc = timr;
}
}
std::cout << "Mutex time : " << double(min_time) / ntests << " ns (" << double(min_rdtsc) / ntests
<< " cycles, " << double(min_rdtsc) / min_time << " GHz)" << std::endl;
}
}
现代3GHz Linux GodBolt在线服务器的输出结果:
Spinlock time: 7.65 ns (22.95 cycles, 3.00 GHz)
Mutex time : 15.59 ns (46.78 cycles, 3.00 GHz)
旧款2GHz Windows笔记本的输出:
Spinlock time: 12.38 ns (25.94 cycles, 2.10 GHz)
Mutex time : 74.27 ns (155.58 cycles, 2.09 GHz)
(std::shared_mutex
的时间为PC超过了50 ns
)
通过运行多个线程来测量在多个线程竞争同一个原子标志或互斥锁获取锁时计时的情况,程序可以变得更加复杂。
以下程序测试单线程和多线程定时。此外,还对计数器进行递增作为一种类似自旋锁/互斥锁保护的工作。
为了获得精确的结果,程序通过将不同的线程分配到不同的核心来设置线程的亲和性。
多线程版本使用std::barrier在相同的精确时间点上进行测量的开始/停止。此外,它使用原子计数器来同步每个线程的速度,使它们以相同的速率前进。
多线程版本的定时应该几乎等于单线程版本,特别是在现代CPU上,这两者的时间应该非常接近。如果所选的(由亲和性选择)核心受操作系统占用太多,则多线程的定时可能会大得多。如果某些操作系统以某种奇怪的顺序重新安排线程,则定时可能会更长。我遇到过Windows多线程定时很慢,而Linux定时完全没有问题的情况。
请查看代码后的计时。请注意,GodBolt服务器的时间非常短。自旋锁(单线程和多线程)是8 ns
(23个周期
),这对应于原子标志的主要指令XCHG的定时(此处为17个周期
)。互斥锁是16 ns
(50个周期
),几乎与自旋锁相同,因为在Linux中它是使用原子标志实现的,额外的开销是由于syscall
操作或CALL
指令造成的。
在线测试!
#include <cstdint>
#include <atomic>
#include <mutex>
#include <future>
#include <iostream>
#include <iomanip>
#include <barrier>
#include <optional>
#include <thread>
#include <vector>
#include <immintrin.h>
#if defined(_MSC_VER) && !defined(__clang__)
#define FINL [[msvc::forceinline]]
#else
#define FINL __attribute__((always_inline))
#endif
#ifdef _WIN32
#include <windows.h>
inline void SetAffinity(std::thread & thr, size_t i) {
if (SetThreadAffinityMask(thr.native_handle(), DWORD_PTR(1) << i) == 0)
std::cout << "SetThreadAffinityMask failed, GLE = " << GetLastError() << std::endl;
}
#else
#include <pthread.h>
inline void SetAffinity(std::thread & thr, size_t i) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
int rc = pthread_setaffinity_np(thr.native_handle(), sizeof(cpu_set_t), &cpuset);
if (rc != 0)
std::cout << "Error calling pthread_setaffinity_np: " << rc << std::endl;
}
#endif
int main() {
auto Rdtsc = []() -> uint64_t { return __rdtsc(); };
auto Time = []() -> uint64_t {
static auto const gtb = std::chrono::high_resolution_clock::now();
return std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::high_resolution_clock::now() - gtb).count();
};
std::cout << std::fixed << std::setprecision(2);
auto RunST = [&Rdtsc, &Time](auto const & ReLock, auto nloops, auto ntests, auto & min_time, auto & min_rdtsc){
{ auto volatile ares = std::async(std::launch::async, [&]{ ReLock(); }); }
for (size_t j = 0; j < nloops; ++j) {
auto timr = Rdtsc(), tim = Time();
for (size_t i = 0; i < ntests / 4; ++i) {
ReLock(); ReLock(); ReLock(); ReLock();
}
tim = Time() - tim;
timr = Rdtsc() - timr;
if (tim < min_time) {
min_time = tim;
min_rdtsc = timr;
}
}
};
auto RunMT = [&Rdtsc, &Time](auto nthr, auto ithr, auto const & ReLock, auto nloops, auto ntests, auto & barrier, auto & timings, auto & min_time, auto & min_rdtsc, auto & brs){
for (size_t j = 0; j < nloops; ++j) {
barrier.arrive_and_wait();
size_t nibr = (ithr + 1) % nthr;
uint32_t ibrc = 0;
auto ReSync = [&]{
ibrc += 16;
brs[ithr].store(ibrc, std::memory_order_relaxed);
while (ibrc > brs[nibr].load(std::memory_order_relaxed)) {}
};
ReSync();
auto timr = Rdtsc(), tim = Time();
for (size_t i = 0; i < ntests / 16; ++i) {
ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock();
ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock(); ReLock();
ReSync();
}
tim = Time() - tim;
timr = Rdtsc() - timr;
timings[ithr] = {tim, timr};
barrier.arrive_and_wait();
if (ithr != 0)
continue;
std::sort(timings.begin(), timings.end());
if (timings.back().first >= timings.front().first * 1.3)
continue;
tim = 0; timr = 0;
for (size_t i = 0; i < timings.size(); ++i) {
tim += timings[i].first;
timr += timings[i].second;
}
tim /= timings.size();
timr /= timings.size();
if (tim < min_time) {
min_time = tim;
min_rdtsc = timr;
}
}
};
auto const hard_threads = std::thread::hardware_concurrency();
auto const test_threads = 2;
std::cout << test_threads << " threads." << std::endl;
{
uint64_t vcnt = 0;
size_t constexpr ntries = 1 << 4, nloops = 1 << 8, ntests = 1 << 11;
uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
std::atomic_flag f = ATOMIC_FLAG_INIT;
auto const ReLock = [&f, &vcnt]() FINL {
while (f.test_and_set(std::memory_order_acquire)) {}
++vcnt;
f.clear(std::memory_order_release);
};
auto const stim = Time();
for (size_t itry = 0; itry < ntries; ++itry)
RunST(ReLock, nloops, ntests, min_time, min_rdtsc);
std::cout << "Single-Threaded Spinlock time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
<< " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
<< std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
<< std::setprecision(8) << double(vcnt) / (ntries * (nloops * ntests + 1)) << std::endl;
}
{
uint64_t vcnt = 0;
size_t constexpr ntries = 1 << 4, nloops = 1 << 6, ntests = 1 << 11;
size_t const nthr = test_threads;
uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
std::atomic_flag f = ATOMIC_FLAG_INIT;
std::barrier barrier(nthr, []() noexcept {});
std::vector<std::pair<uint64_t, uint64_t>> timings(nthr);
std::vector<std::optional<std::thread>> threads(nthr);
auto const ReLock = [&f, &vcnt]() FINL {
while (f.test_and_set(std::memory_order_acquire)) {}
++vcnt;
f.clear(std::memory_order_release);
};
auto const stim = Time();
for (size_t itry = 0; itry < ntries; ++itry) {
std::vector<std::atomic<uint32_t>> brs(nthr);
for (size_t ithr = 0; ithr < threads.size(); ++ithr) {
threads[ithr] = std::thread([&, ithr]{ RunMT(nthr, ithr, ReLock, nloops, ntests, barrier, timings, min_time, min_rdtsc, brs); });
SetAffinity(*threads[ithr], nthr * 2 <= hard_threads ? ithr * 2 : ithr);
}
for (auto & t: threads)
t->join();
threads.clear();
threads.resize(nthr);
}
std::cout << "Multi -Threaded Spinlock time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
<< " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
<< std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
<< std::setprecision(8) << double(vcnt) / (ntries * nthr * nloops * ntests) << std::endl;
}
{
uint64_t vcnt = 0;
size_t constexpr ntries = 1 << 4, nloops = 1 << 8, ntests = 1 << 11;
uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
std::mutex mux;
auto const ReLock = [&mux, &vcnt]() FINL {
std::lock_guard<std::mutex> lock(mux);
++vcnt;
};
auto const stim = Time();
for (size_t itry = 0; itry < ntries; ++itry)
RunST(ReLock, nloops, ntests, min_time, min_rdtsc);
std::cout << "Single-Threaded Mutex time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
<< " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
<< std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
<< std::setprecision(8) << double(vcnt) / (ntries * (nloops * ntests + 1)) << std::endl;
}
{
uint64_t vcnt = 0;
size_t constexpr ntries = 1 << 4, nloops = 1 << 6, ntests = 1 << 11;
size_t const nthr = test_threads;
uint64_t min_time = 1ULL << 50, min_rdtsc = 0;
std::mutex mux;
std::barrier barrier(nthr, []() noexcept {});
std::vector<std::pair<uint64_t, uint64_t>> timings(nthr);
std::vector<std::optional<std::thread>> threads(nthr);
auto const ReLock = [&mux, &vcnt]() FINL {
std::lock_guard<std::mutex> lock(mux);
++vcnt;
};
auto const stim = Time();
for (size_t itry = 0; itry < ntries; ++itry) {
std::vector<std::atomic<uint32_t>> brs(nthr);
for (size_t ithr = 0; ithr < threads.size(); ++ithr) {
threads[ithr] = std::thread([&, ithr]{ RunMT(nthr, ithr, ReLock, nloops, ntests, barrier, timings, min_time, min_rdtsc, brs); });
SetAffinity(*threads[ithr], nthr * 2 <= hard_threads ? ithr * 2 : ithr);
}
for (auto & t: threads)
t->join();
threads.clear();
threads.resize(nthr);
}
std::cout << "Multi -Threaded Mutex time: " << std::setprecision(2) << std::setw(6) << double(min_time) / ntests
<< " ns (" << std::setw(6) << double(min_rdtsc) / ntests << " cycles, " << double(min_rdtsc) / min_time << " GHz), test time "
<< std::setw(4) << (Time() - stim) / 1'000'000'000.0 << " sec, precision "
<< std::setprecision(8) << double(vcnt) / (ntries * nthr * nloops * ntests) << std::endl;
}
}
在 GodBolt Linux 服务器上的输出:
2 threads.
Single-Threaded Spinlock time: 8.63 ns ( 25.91 cycles, 3.00 GHz), test time 0.31 sec, precision 1.00000000
Multi -Threaded Spinlock time: 8.51 ns ( 25.58 cycles, 3.00 GHz), test time 1.04 sec, precision 1.00000000
Single-Threaded Mutex time: 15.89 ns ( 47.66 cycles, 3.00 GHz), test time 0.82 sec, precision 1.00000000
Multi -Threaded Mutex time: 17.94 ns ( 53.86 cycles, 3.00 GHz), test time 4.18 sec, precision 1.00000000
旧的Windows笔记本电脑上的输出:
2 threads.
Single-Threaded Spinlock time: 31.40 ns ( 67.22 cycles, 2.14 GHz), test time 0.48 sec, precision 1.00000000
Multi -Threaded Spinlock time: 32.45 ns ( 70.21 cycles, 2.16 GHz), test time 0.98 sec, precision 1.00000000
Single-Threaded Mutex time: 79.69 ns (168.36 cycles, 2.11 GHz), test time 0.94 sec, precision 1.00000000
Multi -Threaded Mutex time: 83.67 ns (269.23 cycles, 3.22 GHz), test time 0.91 sec, precision 1.00000000
high_resolution_clock
中获得如此高的分辨率。我习惯于它在微秒级别停滞不前。顺便提醒一下,high_resolution_clock
不保证单调。有些实现会向前或向后跳跃时间。 - user4581301