C++11:具有互斥锁的线程看到原子变量的值发生变化,尽管这是唯一可以改变它的代码

c++11: thread with mutex sees atomic variable's value changing despite this being the only code that can change it

本文关键字:变化 改变 唯一 代码 线程 变量 C++11      更新时间:2023-10-16

一个原子变量(在本例中为128位结构)正在更新,这让唯一有能力更新它的线程感到惊讶。这是怎么回事

这是一个最小的例子,所以它不会做任何有意义的事情,但是:alloc()函数返回一个malloc'd缓冲区100次,然后分配一个新的缓冲区,它将返回100次,以此类推,即使面对被多个线程调用的情况。

我有一个原子变量,它是一个带有指针、32位int和另一个32位计数器的结构,旨在避免ABA问题。

我有一个分为两部分的函数。第一部分将,如果返回计数为非零,则CAS结构将递减返回计数(并递增ABA计数器),然后返回指针。否则,第二部分获得一个互斥对象,为一个新指针分配内存,CAS的小结构完全带有新指针、一个新的非零返回计数器,以及ABA计数器的一个增量。

简而言之,当计数器大于零时,每个线程都可以更新这个结构。但一旦它为零,第一个获取互斥对象的线程,我认为,将是唯一可以再次CAS更新此结构的线程。

但有时CAS会失败!"它怎么会失败"是我的问题

下面是一个运行示例。它可以用g++ lockchange.cxx -o lockchange -latomic -pthread编译。它在Fedora 31上的gcc version 9.2.1 20190827 (Red Hat 9.2.1-1) (GCC)上运行。

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cassert>
#include <cstring>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;

struct MyPair { /* Hungarian: pair */
char*    pc;         /* a buffer to be used n times */
int32_t  iRemaining; /* number of times left to use pc */
uint32_t iUpdates;   /* to avoid ABA problem */
};

const int iThreads{ 200 };
const int iThreadIterations{ 1000000 };
const int iSizeItem{ 128 };
mutex mux;
atomic<MyPair> pairNext;

char* alloc() {
TRY_AGAIN:
MyPair pairCur = pairNext.load();
// CASE 1: We can use the existing buffer?
while ( pairCur.iRemaining ) {
char* pcRV = pairCur.pc;
MyPair pairNew = { pairCur.pc,
pairCur.iRemaining - 1,
pairCur.iUpdates + 1 };
if ( pairNext.compare_exchange_weak( pairCur, pairNew ) )
return pcRV;
// Otherwise, pairNext was changed out from under us and pairCur
// will have been updated.  Try again, as long as iRemaining
// non-zero.
}

// CASE 2: We've used pc as many times as allowed, so allocate a new pc.
// Get a mutex as we'll be changing too many fields to do atomically.
lock_guard<mutex> guard( mux );
// If multiple threads saw iRemaining = 0, they all will
// have tried for the mutex; only one will have gotten it, so
// there's a good chance that by the time we get the mutex, a
// sibling thread will have allocated a new pc and placed it at
// pairNext, so we don't need to allocate after all.
if ( pairNext.load().iRemaining ) // <=============================== it's as if this line isn't seeing the update made by the line below in real time.
goto TRY_AGAIN;
// Get a new buffer.
char* pcNew = (char*) malloc( iSizeItem );
MyPair pairNew = { pcNew, 100, pairCur.iUpdates + 1 };
if ( pairNext.compare_exchange_strong( pairCur, pairNew ) ) { //<===== the update that's not being seen above in real time
// *** other stuff with pcNew that needs mutex protection ***;
return pcNew;
} else {
// CASE 2c: after allocating a new page, we find that
// another thread has beaten us to it.  I CAN'T FIGURE OUT
// HOW THAT'S POSSIBLE THOUGH.  Our response should be safe
// enough: put our allocation back, and start all over again
// because who knows what else we missed.  I see this error
// like 813 times out of 40 BILLION allocations in the
// hammer test, ranging from 1 to 200 threads.
printf( "unexpected: had lock but pairNext changed when iRemaining=0n" );
// In fact the following free and goto should and seem to
// recover fine, but to be clear my question is how we can
// possibly end up here in the first place.
abort();
free( pcNew );
goto TRY_AGAIN;
}
}

void Test( int iThreadNumber ) {
for ( int i = 0; i < iThreadIterations; i++ )
alloc();
}

int main( int nArg, char* apszArg[] ) {
vector<thread> athr;
for ( int i = 0; i < iThreads; i++ )
athr.emplace_back( Test, i );
for ( auto& thr: athr )
thr.join();
}

请注意,goto TRY_AGAIN;会解锁互斥锁,因为您要跳回到构造lock_guard<mutex>之前。通常,人们把{}放在一个顶部取锁的示波器周围,以明确这一点(并控制何时解锁)。我没有检查ISO C++规则,看看这是否是必需的行为,但至少从G++和clang++实现它的方式来看,goto确实解锁了。(将RAII锁定与goto混合似乎是糟糕的设计)。

还要注意,在持有互斥对象时,确实会重新加载pairNext一次,但要放弃该值,并将pairCur保留为CAS尝试的"预期"值

对于要达到的临界段内的CAS,pairNext.iRemaining必须是

  • 仍然为零(例如,此线程赢得了获取锁的竞赛)。假设CAS因为pairNext == pairCur而成功
  • 在另一个或多个线程将iRemaining设置为100并在该线程处于睡眠状态时将其一直递减到零之后,再次或零。如果线程多于核心,则这种情况很容易发生。即使有很多内核,这也是可能的:中断可能会暂时阻塞线程,或者当它发现互斥锁时,它的回退策略可能会导致它在计数器再次为零时才重试

我添加了新的调试代码,它清楚地表明了这一点:

lock_guard<mutex> guard( mux );    // existing code
if ( pairNext.load().iRemaining )
goto TRY_AGAIN;
// new debugging code
MyPair tmp = pairNext.load();
if (memcmp(&tmp, &pairCur, sizeof(tmp)) != 0)
printf("pairNext changed between retry loop and taking the mutexn"
"cur  = %p, %d, %un"
"next = %p, %d, %un",
pairCur.pc, pairCur.iRemaining, pairCur.iUpdates,
tmp.pc, tmp.iRemaining, tmp.iUpdates);
$ clang++ -g -O2 lc.cpp -o lockchange -latomic -pthread && ./lockchange 
pairNext changed between retry loop and taking the mutex
cur  = 0x7f594c000e30, 0, 808
next =  0x7f5940000b60, 0, 909
unexpected: had lock but pairNext changed when iRemaining=0
Aborted (core dumped)

修复此问题:

由于您正在使用持有的互斥对象重新加载pairNext,因此只需将该值用作CAS的"预期值"即可。不幸的是,编译器不会将foo.load().member优化为只加载该成员:他们仍然在x86-64或其他ISA上加载带有lock cmpxchg16b的整个16字节对象。所以你无论如何都要付出全部代价。

lock_guard<mutex> guard( mux );
pairCur = pairNext.load();   // may have been changed by other threads
if ( pairCur.iRemaining )
goto TRY_AGAIN;
// then same as before, use it for CAS
// no other thread can be in the critical section, 
// and the code outside won't do anything while pairNext.iRemaining == 0

无论如何,16字节原子加载的成本与CAS相同,但故障路径必须释放malloc缓冲区或旋转,直到CAS在离开关键部分之前成功。如果可以避免浪费太多的CPU时间并引起争用,则后者实际上可以工作,例如使用_mm_pause()

这个问题被称为"ABA问题",我可以将其总结为检查无锁多线程编码中的变量,并认为它没有改变,但它已经改变了。

这里,iRemaining是设置为100的计数器,然后重复向下计数到0。

在互斥锁被锁定后,"优化检查"(不需要确保正确性,而只是为了避免分配新缓冲区和重置iRemaining等的开销,如果另一个线程已经这样做的话)天真地检查iRemaining == 0,以确定结构pairCur在获取锁期间没有改变(这确实可能涉及长时间的等待)。

事实上,当线程A正在等待锁定时,很少,但考虑到数十亿次的尝试,相当多次iRemaining正被减少100倍的整数倍。通过让代码运行到abort(),然后查看变量,我看到pairNext保持一个值,比如{ pc = XXX, iRemaining = 0, iUpdates = 23700 },但pairNew{ pc = YYY, iRemaining = 100, iUpdates = 23600 }iUpdates现在比我们想象的高出100换句话说,当我们等待锁定时,又进行了100次更新,这正是再次将iRemaining变为0的确切数字。这也意味着pc与以前不同,

该结构已经有一个"更新计数器"iUpdates,这是避免ABA问题的标准解决方案。如果不是检查iRemaining == 0,而是检查iUpdates与我们的预锁定原子快照相同,那么优化启发式算法将变得100%有效,并且我们永远不会得到意外的printf()abort()。(好吧,它可能仍然会发生,但现在需要一个线程被阻止进行2^32次操作的精确倍数,而不是只有100次操作,而且如果在这种架构上可能的话,这可能每年、十年或世纪只发生一次。)以下是改进的代码:

if ( pairNext.load().iUpdates != pairCur.iUpdates ) // <=============================== it's as if this line isn't seeing the update made by the line below in real time.