最快的内联装配旋转锁

Fastest inline-assembly spinlock

本文关键字:旋转      更新时间:2023-10-16

我正在用c++编写一个多线程应用程序,其中性能至关重要。在线程之间复制小结构时,我需要使用大量的锁定,为此我选择了使用spinlock。

我对此进行了一些研究和速度测试,发现大多数实现的速度大致相同:

  • Microsofts CRITICAL_SECTION,SpinCount设置为1000,得分约为140个时间单位
  • 用微软InterlockedCompareExchange实现该算法的分数约为95个时间单位
  • 我还尝试过使用__asm {}的一些内联汇编,使用类似的代码,它的分数大约为70个时间单位,,但我不确定是否已经创建了适当的内存屏障

Edit:这里给出的时间是2个线程锁定和解锁spinlock 1000000次所需的时间。

我知道这并没有太大区别,但由于自旋锁是一个使用频繁的对象,人们可能会认为程序员会就制作自旋锁的最快方法达成一致。然而,在谷歌上搜索它会产生许多不同的方法。如果使用内联汇编并使用指令CMPXCHG8B而不是比较32位寄存器来实现,我认为上述方法将是最快的此外,必须考虑内存障碍,这可以通过LOCK CMPXHG8B(我认为是?)来实现,它保证对核心之间共享内存的"独占权限"。最后[有人建议]对于繁忙等待应该伴随NOP:REP,这将使超线程处理器能够切换到另一个线程,但我不确定这是真是假?

从我对不同自旋锁的性能测试中可以看出,没有太大的差异,但出于纯粹的学术目的,我想知道哪一个最快。然而,由于我在汇编语言和内存障碍方面的经验极其有限,如果有人能在以下模板中为我提供的最后一个示例编写汇编代码,包括LOCK CMPXCHG8B和适当的内存障碍

__asm
{
spin_lock:
;locking code.
spin_unlock:
;unlocking code.
}

尽管已经有了一个公认的答案,但有一些遗漏的地方可以用来改进所有答案,摘自英特尔的这篇文章,所有这些都是快速锁定实现:

  1. 在易失性读取而不是原子指令上旋转,这样可以避免不必要的总线锁定,尤其是在高度争用的锁上
  2. 对竞争激烈的锁使用退避
  3. 内联锁,最好是使用内联asm有害的编译器的内部函数(基本上是MSVC)

我通常不会抱怨那些努力实现快速代码的人:这通常是一个非常好的练习,可以更好地理解编程和更快的代码。

我也不会在这里抱怨,但我可以明确地指出,快速自旋锁3指令长或多的问题是徒劳的,至少在x86架构上是这样。

原因如下:

调用具有典型代码序列的旋转锁

lock_variable DW 0    ; 0 <=> free
mov ebx,offset lock_variable
mov eax,1
xchg eax,[ebx]
; if eax contains 0 (no one owned it) you own the lock,
; if eax contains 1 (someone already does) you don't

释放旋转锁是微不足道的

mov ebx,offset lock_variable
mov dword ptr [ebx],0

xchg指令引发处理器上的锁定引脚,这实际上意味着我希望在接下来的几个时钟周期内使用总线。该信号迂回通过高速缓存并向下到达最慢的总线主控设备,该设备通常是PCI总线。当每个总线主控设备都完成了锁定(锁定确认)信号被发送回来。然后进行实际的交换。问题是锁定/锁定序列需要很长时间。PCI总线可以在33MHz下运行,并具有几个等待周期。在3.3 GHz CPU上,这意味着每个PCI总线周期需要100个CPU周期。

根据经验,我假设一个锁需要300到3000个CPU周期才能完成,最后我甚至不知道我是否会拥有这个锁。因此,你可以通过"快速"旋转锁节省的几个周期将是海市蜃楼,因为没有一个锁能像下一个一样,这将取决于你在短时间内的公交车情况。

________________编辑________________

我刚刚读到自旋锁是一个"大量使用的对象"。很明显,你不明白自旋锁每次调用都会消耗大量的CPU周期。或者,换句话说,每次调用它时,都会损失大量的处理能力。

使用自旋锁(或其更大的兄弟,关键部分)的诀窍是在实现预期程序功能的同时尽可能少地使用它们。在所有地方使用它们都很容易,结果会导致性能平平。

这并不全是关于编写快速代码,它还涉及组织数据。当您编写"在线程之间复制小结构"时,您应该意识到锁定可能需要比实际复制长数百倍的时间才能完成。

________________编辑________________

当你计算平均锁定时间时,它可能会说得很少,因为它是在你的机器上测量的,而你的机器可能不是预期的目标(它可能具有完全不同的总线使用特性)。对于您的机器,平均值将由单个非常快的时间(当总线主控活动没有干扰时)一直到非常慢的时间(总线主控干扰很大时)组成。

您可以引入确定最快和最慢情况的代码,并计算商,以了解自旋锁定时间的变化幅度。

________________编辑________________

2016年5月更新。

Peter Cordes提出了这样一个观点,即"在无争用的情况下调整锁是有意义的",并且在现代CPU上不会出现数百个时钟周期的锁定时间,除非锁定变量未对齐。我开始怀疑我以前用32位Watcom C编写的测试程序是否会受到WOW64的阻碍,因为它运行在64位操作系统:Windows 7上。

因此,我编写了一个64位程序,并用TDM的gcc 5.3进行了编译。该程序使用隐式总线锁定指令变体"XCHG r,m"进行锁定,并使用简单赋值"MOV m,r"进行解锁。在一些锁变体中,对锁变量进行了预测试,以确定尝试锁是否可行(使用简单的比较"CMP r,m",可能不在L3之外冒险)。这是:

// compiler flags used:
// -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0
#define CLASSIC_BUS_LOCK
#define WHILE_PRETEST
//#define SINGLE_THREAD
typedef unsigned char      u1;
typedef unsigned short     u2;
typedef unsigned long      u4;
typedef unsigned int       ud;
typedef unsigned long long u8;
typedef   signed char      i1;
typedef          short     i2;
typedef          long      i4;
typedef          int       id;
typedef          long long i8;
typedef          float     f4;
typedef          double    f8;
#define usizeof(a) ((ud)sizeof(a))
#define LOOPS 25000000
#include <stdio.h>
#include <windows.h>
#ifndef bool
typedef signed char bool;
#endif
u8 CPU_rdtsc (void)
{
ud tickl, tickh;
__asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh));
return ((u8)tickh << 32)|tickl;
}
volatile u8 bus_lock (volatile u8 * block, u8 value)
{
__asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory");
return value;
}
void bus_unlock (volatile u8 * block, u8 value)
{
__asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory");
}
void rfence (void)
{
__asm__ __volatile__( "lfence" : : : "memory");
}
void rwfence (void)
{
__asm__ __volatile__( "mfence" : : : "memory");
}
void wfence (void)
{
__asm__ __volatile__( "sfence" : : : "memory");
}
volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer)
{
return (bool)(*lockVariablePointer == 0ull);
}
volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer)
{
return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull);
}
void LOCK_spinlockLeave (volatile u8 *lockVariablePointer)
{
*lockVariablePointer = 0ull;
}
static volatile u8 lockVariable = 0ull,
lockCounter =  0ull;
static volatile i8 threadHold = 1;
static u8 tstr[4][32];    /* 32*8=256 bytes for each thread's parameters should result in them residing in different cache lines */
struct LOCKING_THREAD_STRUCTURE
{
u8 numberOfFailures, numberOfPreTests;
f8 clocksPerLock, failuresPerLock, preTestsPerLock;
u8 threadId;
HANDLE threadHandle;
ud idx;
} *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]};
DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp)
{
ud n = LOOPS;
u8 clockCycles;
SetThreadAffinityMask (ltsp->threadHandle, 1ull<<ltsp->idx);
while (threadHold) {}
clockCycles = CPU_rdtsc ();
while (n)
{
Sleep (0);
#ifdef CLASSIC_BUS_LOCK
while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
while (1)
{
do
{
++ltsp->numberOfPreTests;
} while (!LOCK_spinlockPreTestIfFree (&lockVariable));
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
#else
while (1)
{
++ltsp->numberOfPreTests;
if (LOCK_spinlockPreTestIfFree (&lockVariable))
{
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
}
#endif
#endif
++lockCounter;
LOCK_spinlockLeave (&lockVariable);
#ifdef CLASSIC_BUS_LOCK
while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
while (1)
{
do
{
++ltsp->numberOfPreTests;
} while (!LOCK_spinlockPreTestIfFree (&lockVariable));
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
#else
while (1)
{
++ltsp->numberOfPreTests;
if (LOCK_spinlockPreTestIfFree (&lockVariable))
{
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
}
#endif
#endif
--lockCounter;
LOCK_spinlockLeave (&lockVariable);
n-=2;
}
clockCycles = CPU_rdtsc ()-clockCycles;
ltsp->clocksPerLock =   (f8)clockCycles/           (f8)LOOPS;
ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS;
ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS;
//rwfence ();
ltsp->idx = 4u;
ExitThread (0);
return 0;
}
int main (int argc, char *argv[])
{
u8 processAffinityMask, systemAffinityMask;
memset (tstr, 0u, usizeof(tstr));
lts[0]->idx = 3;
lts[1]->idx = 2;
lts[2]->idx = 1;
lts[3]->idx = 0;
GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask);
SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS);
SetThreadAffinityMask (GetCurrentThread (), 1ull);
lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)&lts[0]->threadId);
#ifndef SINGLE_THREAD
lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)&lts[1]->threadId);
lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)&lts[2]->threadId);
lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)&lts[3]->threadId);
#endif
SetThreadAffinityMask (GetCurrentThread (), processAffinityMask);
threadHold = 0;
#ifdef SINGLE_THREAD
while (lts[0]->idx<4u) {Sleep (1);}
#else
while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);}
#endif
printf ("T0:%1.1f,%1.1f,%1.1fn", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock);
printf ("T1:%1.1f,%1.1f,%1.1fn", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock);
printf ("T2:%1.1f,%1.1f,%1.1fn", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock);
printf ("T3:%1.1f,%1.1f,%1.1fn", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock);
printf ("T*:%1.1f,%1.1f,%1.1fn", (lts[0]->clocksPerLock+  lts[1]->clocksPerLock+  lts[2]->clocksPerLock+  lts[3]->clocksPerLock)/  4.,
(lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4.,
(lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.);
printf ("LC:%un", (ud)lockCounter);
return 0;
}

该程序在基于DELL i5-4310U的计算机上运行,该计算机具有DDR3-800、2核/2 HTs(2.7GHz)和通用L3缓存。

首先,WOW64的影响似乎可以忽略不计。

执行非受控锁定/解锁的单个线程能够每110个周期执行一次。调整未受控制的锁是无用的:任何为增强单个XCHG指令而添加的代码都只会使其速度变慢。

随着四个HT用锁定尝试轰炸锁定变量,情况发生了根本性的变化。实现成功锁定所需的时间跃升至994个周期,其中很大一部分可归因于2.2次失败的锁定尝试。换句话说,在高争用情况下,平均必须尝试3.2个锁才能获得成功的锁。显然,110个循环没有变成110*3.2,而是更接近110*9。因此,其他机制在这里发挥作用,就像在旧机器上进行测试一样。此外,平均994次循环包括716和1157次之间的范围

实现预测试的锁变体需要最简单的变体(XCHG)重新使用大约95%的循环。平均而言,他们将执行17个CMP,以发现尝试1.75个锁是可行的,其中1个是成功的。我建议使用预测试,不仅因为它更快:它对总线锁定机制施加的压力更小(3.2-1.75=1.45次锁定尝试更少),尽管它稍微增加了复杂性。

Wikipedia有一篇关于spinlocks的好文章,这里是x86实现

http://en.wikipedia.org/wiki/Spinlock#Example_implementation

请注意,它们的实现不使用"lock"前缀,因为它在x86上对于"xchg"指令是多余的——它隐式地具有锁语义,正如Stackoverflow讨论中所讨论的:

在多核x86上,LOCK是否需要作为XCHG的前缀?

REP:NOP是PAUSE指令的别名,你可以在这里了解更多信息

x86暂停指令在spinlock*中是如何工作的?它可以在其他场景中使用吗?

关于记忆障碍的问题,以下是你可能想知道的

内存障碍:软件黑客的硬件视角Paul E.McKenney

http://irl.cs.ucla.edu/~yingdi/paperreading/wymb.2010.06.07c.pdf

只需看看这里:使用cmpxchg 的x86自旋锁

感谢Cory Nelson

__asm{
spin_lock:
xorl %ecx, %ecx
incl %ecx
spin_lock_retry:
xorl %eax, %eax
lock; cmpxchgl %ecx, (lock_addr)
jnz spin_lock_retry
ret
spin_unlock:
movl $0 (lock_addr)
ret
}

另一位消息人士说:http://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm

lock    cmpxchg8b qword ptr [esi]
is replaceable with the following sequence
try:
lock    bts dword ptr [edi],0
jnb     acquired
wait:
test    dword ptr [edi],1
je      try
pause                   ; if available
jmp     wait
acquired:
cmp     eax,[esi]
jne     fail
cmp     edx,[esi+4]
je      exchange
fail:
mov     eax,[esi]
mov     edx,[esi+4]
jmp     done
exchange:
mov     [esi],ebx
mov     [esi+4],ecx
done:
mov     byte ptr [edi],0

下面是关于无锁与锁实现的讨论:http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html

只是问:

在深入研究自旋锁定和几乎无锁定的数据结构之前:

在您的基准测试和应用程序中,您是否确保竞争线程能够在不同的核心上运行?

如果没有,你最终可能会得到一个程序,它在你的开发机器上运行得很好,但在现场却很糟糕/失败,因为一个线程必须既是你的spinlock的锁定者又是解锁者。

给你一个数字:在Windows上,你有10毫秒的标准时间片。如果你不确保有两个物理线程参与锁定/解锁,你最终会得到每秒大约500次锁定/解锁的结果,这将是非常meh