最快的内联汇编自旋锁

26

我正在用C++编写一个多线程应用程序,性能至关重要。在线程之间复制小结构时,我需要使用大量的锁定,因此我选择使用自旋锁。

我已经进行了一些研究和速度测试,并发现大多数实现的速度大致相同:

  • Microsoft的CRITICAL_SECTION,SpinCount设置为1000,得分约为140个时间单位
  • 使用Microsoft的InterlockedCompareExchange实现这个算法,得分约为95个时间单位
  • 我还尝试使用一些内联汇编代码__asm {},使用类似这个代码的东西,得分约为70个时间单位,但是我不确定是否已创建了适当的内存屏障。

编辑:这里给出的时间是两个线程锁定和解锁自旋锁1,000,000次所需的时间。

我知道这并没有太大的区别,但自旋锁是一个经常使用的对象,人们会认为程序员已经就最快的自旋锁方法达成了共识。然而,谷歌搜索会得到许多不同的方法。如果使用内联汇编并使用指令CMPXCHG8B而不是比较32位寄存器,我认为上述提到的方法将是最快的。此外,必须考虑内存屏障,这可以通过LOCK CMPXHG8B(我想?)来完成,它保证了核之间共享内存的“独占权”。最后,一些人建议在忙等待时应该伴随NOP:REP,这样可以使超线程处理器切换到另一个线程,但我不确定这是否正确?

通过我对不同自旋锁的性能测试,发现它们之间没有太大差异,但出于纯粹学术目的,我想知道哪一种最快。然而,由于我在汇编语言和内存屏障方面经验极其有限,如果有人能为我编写带有LOCK CMPXCHG8B和适当内存屏障的最后一个示例代码模板,我将感到非常高兴:

__asm
{
     spin_lock:
         ;locking code.
     spin_unlock:
         ;unlocking code.
}

11
感谢您在提问之前先提供了良好的资源和信息,我认为您提供的已经足够了。谢谢。 - huseyin tugrul buyukisik
1
到底有多少才算很多呢?需要非常非常多才会担心旋转速度。你确定这里没有更好的限制访问方式吗?还要记住,你旋转的速度并不影响你实际获取锁的时间。无论你旋转得多快,其他人都必须先解锁。如果你发现自己要旋转很长时间,考虑使用yield()循环将执行权传递给另一个正在运��的线程或进程。 - Wug
1
rep nop又称pause,它可以避免P4在离开自旋循环时出现不必要的行为。Intel的手册明确推荐在自旋等待循环中使用它。您可以使用XACQUIREXRELEASE吗(这些功能直到Haswell才可用)? - harold
1
当您需要性能时,请在快速路径上使用无锁/无争用数据结构,并仅在慢速路径上使用锁。 - PlasmaHH
@PlasmaHH 我知道你可以在32位机器上复制数据,但只能复制32位,在64位机器上只能复制64位,使用微软的交错操作。但我不知道是否可以对大于上述示例的结构进行原子读写?你能给我指点一些阅读/文献或示例吗?这可以在汇编中完成吗? - sigvardsen
显示剩余10条评论
5个回答

10

虽然已经有了一个被接受的答案,但还有一些遗漏的事情可以用来改进所有答案,这些内容摘自Intel文章中高速锁实现以上的所有内容

  1. 在volatile读取上自旋,而不是原子指令,这样可以避免不必要的总线锁定,特别是在高度争用的锁情况下。
  2. 对于高度争用的锁,使用退避(back-off)。
  3. 内联锁(inline lock),最好使用内部函数(intrinsics),对于内联汇编有负面影响的编译器(基本上是MSVC)。

8

我通常不会抱怨有人努力实现快速代码:这通常是一个非常好的练习,可以带来对编程的更好理解和更快的代码。

我在这里也不会抱怨,但我可以明确地说,在x86架构上,追求长度为3条指令或更多的快速自旋锁是徒劳无功的。

原因如下:

使用典型代码序列调用自旋锁

lock_variable DW 0    ; 0 <=> free

mov ebx,offset lock_variable
mov eax,1
xchg eax,[ebx]

; if eax contains 0 (no one owned it) you own the lock,
; if eax contains 1 (someone already does) you don't

释放自旋锁很简单。
mov ebx,offset lock_variable
mov dword ptr [ebx],0

xchg指令会在处理器上引发锁定信号,这实际上意味着我想在接下来的几个时钟周期内使用总线。该信号通过高速缓存传播并到达最慢的总线主设备,通常是PCI总线。当每个总线主设备完成锁定确认信号时,会发送回一个locka信号。然后进行实际的交换。问题是,锁定/锁定确认序列需要非常长的时间。PCI总线可能以33MHz的速度运行,具有多个周期的延迟。在3.3 GHz的CPU上,这意味着每个PCI总线周期需要100个CPU周期。
作为经验法则,我假设锁定需要300到3000个CPU周期才能完成,最终我不知道是否拥有锁定。因此,“快速”自旋锁可以节省的几个周期将是一种幻觉,因为没有两个锁是相同的,它将取决于您在那段短时间内的总线情况。
______________编辑______________
我刚刚读到自旋锁是“频繁使用的对象”。嗯,显然你不明白每次调用自旋锁都会消耗大量的CPU周期。或者换句话说,每次调用自旋锁都会损失您的大量处理能力。
使用自旋锁(或其更大的兄弟——临界区)的技巧是尽可能地少用它们,同时仍然实现预期的程序功能。到处使用它们很容易,结果会导致性能不佳。

这不仅仅是关于编写快速代码,还包括组织数据。当你写“在线程之间复制小结构”时,你应该意识到锁可能需要比实际复制花费数百倍的时间才能完成。

________________编辑________________

当您计算平均锁定时间时,它可能几乎没有任何作用,因为它是在您的机器上测量的,而这可能不是预期的目标(可能具有完全不同的总线使用特性)。对于您的机器,平均值由单个非常快的时间(当总线主控活动不干扰时)组成,一直到非常慢的时间(当总线主控干扰显著时)。

您可以引入代码来确定最快和最慢的情况,并计算商以查看自旋锁时间可以变化多大。

________________编辑________________

2016年5月更新。

Peter Cordes提出了“在没有竞争的情况下调整锁可能是有意义的”的想法,并且现代CPU上除非锁变量未对齐,否则不会出现许多数百个时钟周期的锁定时间。我开始怀疑我的以前的测试程序 - 使用32位Watcom C编写 - 是否会受到WOW64的阻碍,因为它在64位操作系统Windows 7上运行。

因此,我编写了一个64位程序,并使用TDM的gcc 5.3进行编译。该程序利用隐式总线锁定指令变体“XCHG r,m”进行锁定,使用简单的赋值“MOV m,r”进行解锁。在某些锁变体中,将预先测试锁变量以确定是否有可能尝试锁定(使用简单的比较“CMP r,m”,可能不会越过L3)。这就是它:

// compiler flags used:

// -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0

#define CLASSIC_BUS_LOCK
#define WHILE_PRETEST
//#define SINGLE_THREAD

typedef unsigned char      u1;
typedef unsigned short     u2;
typedef unsigned long      u4;
typedef unsigned int       ud;
typedef unsigned long long u8;
typedef   signed char      i1;
typedef          short     i2;
typedef          long      i4;
typedef          int       id;
typedef          long long i8;
typedef          float     f4;
typedef          double    f8;

#define usizeof(a) ((ud)sizeof(a))

#define LOOPS 25000000

#include <stdio.h>
#include <windows.h>

#ifndef bool
typedef signed char bool;
#endif

u8 CPU_rdtsc (void)
{
  ud tickl, tickh;
  __asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh));
  return ((u8)tickh << 32)|tickl;
}

volatile u8 bus_lock (volatile u8 * block, u8 value)
{
  __asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory");

  return value;
}

void bus_unlock (volatile u8 * block, u8 value)
{
  __asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory");
}

void rfence (void)
{
  __asm__ __volatile__( "lfence" : : : "memory");
}

void rwfence (void)
{
  __asm__ __volatile__( "mfence" : : : "memory");
}

void wfence (void)
{
  __asm__ __volatile__( "sfence" : : : "memory");
}

volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer)
{
  return (bool)(*lockVariablePointer == 0ull);
}

volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer)
{
  return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull);
}

void LOCK_spinlockLeave (volatile u8 *lockVariablePointer)
{
  *lockVariablePointer = 0ull;
}

static volatile u8 lockVariable = 0ull,
                   lockCounter =  0ull;

static volatile i8 threadHold = 1;

static u8 tstr[4][32];    /* 32*8=256 bytes for each thread's parameters should result in them residing in different cache lines */

struct LOCKING_THREAD_STRUCTURE
{
  u8 numberOfFailures, numberOfPreTests;
  f8 clocksPerLock, failuresPerLock, preTestsPerLock;
  u8 threadId;
  HANDLE threadHandle;
  ud idx;
} *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]};

DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp)
{
  ud n = LOOPS;
  u8 clockCycles;

  SetThreadAffinityMask (ltsp->threadHandle, 1ull<<ltsp->idx);

  while (threadHold) {}

  clockCycles = CPU_rdtsc ();
  while (n)
  {
    Sleep (0);

#ifdef CLASSIC_BUS_LOCK
    while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
    while (1)
    {
      do
      {
        ++ltsp->numberOfPreTests;
      } while (!LOCK_spinlockPreTestIfFree (&lockVariable));

      if (!LOCK_spinlockFailed (&lockVariable)) break;
      ++ltsp->numberOfFailures;
    }
#else
    while (1)
    {
      ++ltsp->numberOfPreTests;
      if (LOCK_spinlockPreTestIfFree (&lockVariable))
      {
        if (!LOCK_spinlockFailed (&lockVariable)) break;
        ++ltsp->numberOfFailures;
      }
    }
#endif
#endif
    ++lockCounter;
    LOCK_spinlockLeave (&lockVariable);

#ifdef CLASSIC_BUS_LOCK
    while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
    while (1)
    {
      do
      {
        ++ltsp->numberOfPreTests;
      } while (!LOCK_spinlockPreTestIfFree (&lockVariable));

      if (!LOCK_spinlockFailed (&lockVariable)) break;
      ++ltsp->numberOfFailures;
    }
#else
    while (1)
    {
      ++ltsp->numberOfPreTests;
      if (LOCK_spinlockPreTestIfFree (&lockVariable))
      {
        if (!LOCK_spinlockFailed (&lockVariable)) break;
        ++ltsp->numberOfFailures;
      }
    }
#endif
#endif
    --lockCounter;
    LOCK_spinlockLeave (&lockVariable);

    n-=2;
  }
  clockCycles = CPU_rdtsc ()-clockCycles;

  ltsp->clocksPerLock =   (f8)clockCycles/           (f8)LOOPS;
  ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS;
  ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS;

//rwfence ();

  ltsp->idx = 4u;

  ExitThread (0);
  return 0;
}

int main (int argc, char *argv[])
{
  u8 processAffinityMask, systemAffinityMask;

  memset (tstr, 0u, usizeof(tstr));

  lts[0]->idx = 3;
  lts[1]->idx = 2;
  lts[2]->idx = 1;
  lts[3]->idx = 0;

  GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask);

  SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS);
  SetThreadAffinityMask (GetCurrentThread (), 1ull);

  lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)&lts[0]->threadId);
#ifndef SINGLE_THREAD
  lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)&lts[1]->threadId);
  lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)&lts[2]->threadId);
  lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)&lts[3]->threadId);
#endif

  SetThreadAffinityMask (GetCurrentThread (), processAffinityMask);

  threadHold = 0;

#ifdef SINGLE_THREAD
  while (lts[0]->idx<4u) {Sleep (1);}
#else
  while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);}
#endif

  printf ("T0:%1.1f,%1.1f,%1.1f\n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock);
  printf ("T1:%1.1f,%1.1f,%1.1f\n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock);
  printf ("T2:%1.1f,%1.1f,%1.1f\n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock);
  printf ("T3:%1.1f,%1.1f,%1.1f\n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock);

  printf ("T*:%1.1f,%1.1f,%1.1f\n", (lts[0]->clocksPerLock+  lts[1]->clocksPerLock+  lts[2]->clocksPerLock+  lts[3]->clocksPerLock)/  4.,
                                    (lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4.,
                                    (lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.);

  printf ("LC:%u\n", (ud)lockCounter);

  return 0;
}

该程序在一台基于DELL i5-4310U计算机上运行,具有DDR3-800、2个内核/2个超线程,可达到2.7GHz和一个普通的L3缓存。

首先,WOW64的影响似乎微不足道。

执行无争用锁定/解锁的单个线程能够每110个周期执行一次。调整无争用锁定是没有意义的:任何添加以增强单个XCHG指令的代码都只会使其变慢。

当四个超线程向锁变量发送锁定尝试时,情况发生了根本性的变化。实现成功锁定所需的时间跳至994个周期,其中相当大的一部分可以归因于2.2个失败的锁定尝试。换句话说,在高争用情况下,平均需要尝试3.2次锁定才能实现成功锁定。显然,110个周期没有变成110*3.2,而更接近于110*9。因此,其他机制在这里起作用,就像对旧机器的测试一样。此外,平均994个周期包括从716到1157的范围。

实现预测试的锁变体需要大约95%的周期,而最简单的变体(XCHG)所需的周期。平均而言,它们将执行17个CMP以确定尝试1.75个锁是否可行,其中1个成功。我建议不仅使用预测试,因为它更快:即使它稍微增加了复杂性,但它对总线锁定机制的压力较小(3.2-1.75 = 1.45个较少的锁定尝试)。


2
@Peter Cordes:那么为什么AF在instruction_tables.pdf第1页底部写道:“即使在单处理器系统上,LOCK前缀通常的代价也超过一百个时钟周期。这也适用于带有内存操作数的XCHG指令。”?我猜Agner Fog可能既对又错...到底是哪种情况呢? - Olof Forshell
1
有趣的数字,感谢链接。不,我没有调整/测量锁定开销的个人经验。你在那里的回答证明了xchg并不像它必须与PCI总线交互那样慢。你还测试了一个完全瓶颈于锁定吞吐量的循环,因此CPU在xchg发生时没有其他工作要做。我认为,但没有测试,xchg在实际代码中具有良好的流水线处理能力,因此在除锁定之外还有大量工作要做的真实代码中,无争用情况下的开销并不可怕。 - Peter Cordes
1
最有效的锁是你成功避免使用的那些——如果你足够熟练的话。新手和不理解锁的人会认为它们没有显著的成本,并且会比其他人更频繁地使用它们,就像大多数编写多线程应用程序的人最终会得到一个更慢的应用程序一样。最终,你的代码可能必须将LOCK信号发送到总线上,并等待最慢的总线主设备返回LOCKA以表示锁已经放置。或者你是否声称这种情况不会发生?如果你的锁变量已经从缓存中清除了呢? - Olof Forshell
1
我认为锁定指令不必等待PCI设备。x86具有高速缓存一致的DMA。这可能是最近的事情,通过将内存控制器放在CPU上实现,使得缓存更容易窃听DMA。如果您删除(或提供证据)关于外部总线的部分,我会取消我的反对票,并可能投赞成票。 - Peter Cordes
2
OP仍然写道:“我需要在线程之间复制小结构时使用大量的锁定,因此我选择使用自旋锁。”尽管我的代码显示每个四核心可以每秒实现270万次锁定/解锁,但是很可能OP使用了如此之多的锁定,以至于他的多线程代码没有锁定更好的单线程。 - Olof Forshell
显示剩余35条评论

6

维基百科有一篇关于自旋锁的好文章,这里是x86实现。

http://en.wikipedia.org/wiki/Spinlock#Example_implementation

请注意,他们的实现不使用“lock”前缀,因为对于“xchg”指令,在x86上它是多余的 - 它隐含地具有锁定语义,正如在此Stackoverflow讨论中所讨论的那样:

On a multicore x86, is a LOCK necessary as a prefix to XCHG?

REP:NOP是PAUSE指令的别名,您可以在此处了解更多信息。

How does x86 pause instruction work in spinlock *and* can it be used in other scenarios?

关于内存屏障问题,这里有你想要知道的一切。

《Memory Barriers: a Hardware View for Software Hackers》作者Paul E. McKenney

http://irl.cs.ucla.edu/~yingdi/paperreading/whymb.2010.06.07c.pdf


在你链接到维基百科实现的代码中,为什么将内存中的锁值设置为0的指令必须是原子性的(使用xchg指令)?似乎即使另一个线程在将其设置为0之前访问了内存位置,它所看到的仍然是锁定状态 - 直接对内存位置进行mov操作不也可以吗? - VF1
是的,在新的x86处理器上 mov 是有效的,但在一些旧的处理器上则不然。这个问题在下一节 "重要优化" 中有详细解释。 在后来的 x86 架构实现中,spin_unlock 可以安全地使用未锁定的 MOV 代替较慢的 locked XCHG。这是由于微妙的内存排序规则支持此操作,即使 MOV 不是完整的内存屏障。然而,一些处理器(某些 Cyrix 处理器、某些 Intel Pentium Pro 的修订版(因为存在错误)以及早期的 Pentium 和 i486 SMP 系统)会出现问题,并且受锁保护的数据可能会被破坏。 - amdn
谢谢您指出这个问题。如果您不介意我问一下,这些“微妙的内存排序规则”避免了什么问题? - VF1
X86内存排序要求一个线程的存储必须按照相同的顺序被其他线程看到,这意味着另一个线程无法获取锁并且在释放锁之前不看到关键部分中完成的存储。请参见本文档的第(4)节http://www.spinroot.com/spin/Doc/course/x86_tso.pdf。 - amdn
顺便说一下,我发现一个MOV就足够了(不需要序列化或内存屏障),这在1999年时由英特尔架构师在回复Linus Torvalds的邮件中“官方”澄清。https://lkml.org/lkml/1999/11/24/90 我猜后来发现这对一些旧的x86处理器不起作用。 - amdn

3

请看这里:x86使用cmpxchg实现自旋锁

感谢Cory Nelson。

__asm{
spin_lock:
xorl %ecx, %ecx
incl %ecx
spin_lock_retry:
xorl %eax, %eax
lock; cmpxchgl %ecx, (lock_addr)
jnz spin_lock_retry
ret

spin_unlock:
movl $0 (lock_addr)
ret
}

另一份资料称:

http://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm, 该网址是关于CPU CX8指令的研究。
       lock    cmpxchg8b qword ptr [esi]
is replaceable with the following sequence

try:
        lock    bts dword ptr [edi],0
        jnb     acquired
wait:
        test    dword ptr [edi],1
        je      try
        pause                   ; if available
        jmp     wait

acquired:
        cmp     eax,[esi]
        jne     fail
        cmp     edx,[esi+4]
        je      exchange

fail:
        mov     eax,[esi]
        mov     edx,[esi+4]
        jmp     done

exchange:
        mov     [esi],ebx
        mov     [esi+4],ecx

done:
        mov     byte ptr [edi],0

这里有关于无锁(lock-free)和锁(lock)实现的讨论:

http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html

你会把PAUSE(NOP:REP)指令放在哪里?在循环内还是之前? - sigvardsen
在“等待”部分的“暂停”位置,对于较旧的CPU,据我所知,应该替换为其他指令。 - huseyin tugrul buyukisik
这个实现在尝试交换之前会旋转lock cmpxchg而不是原子加载。 (请参见Necropolis的答案)。 pause可以避免在旋转负载而不是lock指令时发生内存排序错误。 请参见我在另一个问题上的答案,其中包含我认为避免了任何明显问题的简单/最小汇编自旋锁实现 - Peter Cordes
这个也适用于64位吗?为什么它只说x86自旋锁? - Goaler444
1
@Goaler444 x86是通用架构名称,也适用于x86_64,使用cmpxchgq而不是cmpxchgl来使用64位lock_addr。 - huseyin tugrul buyukisik

-1

只是问一下:

在你深入研究自旋锁和几乎无锁数据结构之前:

你是否已经在你的基准测试和应用程序中确保竞争线程保证在不同的核心上运行?

如果没有,你可能会得到一个在开发机上工作得很好但在现场表现糟糕/失败的程序,因为一个线程必须成为自旋锁的锁定器和解锁器。

以数字来说明:在Windows上,您有标准时间片为10毫秒。如果您不能确保两个物理线程参与锁定/解锁,您将最终每秒约有500个锁定/解锁操作,并且这个结果将非常不尽如人意。


当然,一个线程必须锁定/解锁自旋锁。否则它将无法保护任何内容。而且没有理由让一个线程在每个时间片中获取锁,被取消调度,然后解锁。 - dave

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接