std::atomic_compare_exchange_weak线程在设计上是否不安全

Is std::atomic_compare_exchange_weak thread-unsafe by design?

本文关键字:是否 不安全 weak atomic compare exchange std 线程      更新时间:2023-10-16

在cppreference atomic_compare_exchange Talk页面上,std::atomic_compare_exchange_weak的现有实现使用非原子比较指令(例如)计算CAS的布尔结果

lock
cmpxchgq   %rcx, (%rsp)
cmpq       %rdx, %rax

哪个(编辑:为转移注意力道歉)

打破CAS循环,如Action的清单7.2中的并发:

while(!head.compare_exchange_weak(new_node->next, new_node);

规范(29.6.5[atomics.types.operations.req]/21-22)似乎暗示比较结果必须是原子操作的一部分:

效果:原子比较。。。

返回:比较的结果

但它实际上是可实现的吗?我们应该向供应商或LWG提交错误报告吗?

TL;DR:atomic_compare_exchange_weak在设计上是安全的,但实际实现有缺陷。

以下是Clang为这个小片段实际生成的代码:

struct node {
int data;
node* next;
};
std::atomic<node*> head;
void push(int data) {
node* new_node = new node{data};
new_node->next = head.load(std::memory_order_relaxed);
while (!head.compare_exchange_weak(new_node->next, new_node,
std::memory_order_release, std::memory_order_relaxed)) {}
}

结果:

movl  %edi, %ebx
# Allocate memory
movl  $16, %edi
callq _Znwm
movq  %rax, %rcx
# Initialize with data and 0
movl  %ebx, (%rcx)
movq  $0, 8(%rcx) ; dead store, should have been optimized away
# Overwrite next with head.load
movq  head(%rip), %rdx
movq  %rdx, 8(%rcx)
.align  16, 0x90
.LBB0_1:                                # %while.cond
# =>This Inner Loop Header: Depth=1
# put value of head into comparand/result position
movq  %rdx, %rax
# atomic operation here, compares second argument to %rax, stores first argument
# in second if same, and second in %rax otherwise
lock
cmpxchgq  %rcx, head(%rip)
# unconditionally write old value back to next - wait, what?
movq  %rax, 8(%rcx)
# check if cmpxchg modified the result position
cmpq  %rdx, %rax
movq  %rax, %rdx
jne .LBB0_1

这种比较是完全安全的:它只是比较寄存器。然而,整个操作并不安全。

关键点是:compare_exchange_(弱|强)的描述说:

Atomly[…]如果为true,则将this指向的内存内容替换为所需的内容,如果为false,则将预期的内存内容更新为该指向的内存的内容

或者在伪代码中:

if (*this == expected)
*this = desired;
else
expected = *this;

请注意,如果比较为false,则仅将expected写入;如果比较为true,则仅向写入*this。C++的抽象模型不允许在两者都被写入的情况下执行。这对上述push的正确性很重要,因为如果对head进行写入,则new_node会突然指向其他线程可见的位置,这意味着其他线程可以开始读取next(通过访问head->next),如果也对expected(别名new_node->next)进行写入,这是一场比赛。

Clang无条件地写信给new_node->next。在这种比较是真的情况下,这是一种虚构的写作。

这是Clang中的一个错误我不知道GCC是否也这么做。

此外,该标准的措辞并不理想。它声称整个操作必须以原子方式进行,但这是不可能的,因为expected不是原子对象;对那里的写入不可能原子化。标准应该说明的是,比较和对*this的写入是原子性的,但对expected的写入不是。但这并没有那么糟糕,因为没有人真的期望写入是原子的。

因此,应该有Clang(可能还有GCC)的错误报告,以及标准的缺陷报告。

我是最初发现这个bug的人。在过去的几天里,我一直在给Anthony Williams发电子邮件,讨论这个问题和供应商的实施。我没有意识到Cubbi提出了StackOverFlow问题。不仅仅是Clang或GCC,每个供应商都破产了(不管怎样,这一切都很重要)。Anthony Williams也是Just::Thread(一个C++11线程和原子库)的作者,他证实了他的库是正确实现的(只有已知的正确实现)。

Anthony提出了GCC错误报告http://gcc.gnu.org/bugzilla/show_bug.cgi?id=60272

简单示例:

#include <atomic>
struct Node { Node* next; };
void Push(std::atomic<Node*> head, Node* node)
{
node->next = head.load();
while(!head.compare_exchange_weak(node->next, node))
;
}

g++4.8[汇编程序]

mov    rdx, rdi
mov    rax, QWORD PTR [rdi]
mov    QWORD PTR [rsi], rax
.L3:
mov    rax, QWORD PTR [rsi]
lock cmpxchg    QWORD PTR [rdx], rsi
mov    QWORD PTR [rsi], rax !!!!!!!!!!!!!!!!!!!!!!!
jne    .L3
rep; ret

clang 3.3[汇编程序]

movq    (%rdi), %rcx
movq    %rcx, (%rsi)
.LBB0_1:
movq    %rcx, %rax
lock
cmpxchgq    %rsi, (%rdi)
movq    %rax, (%rsi) !!!!!!!!!!!!!!!!!!!!!!!
cmpq    %rcx, %rax !!!!!!!!!!!!!!!!!!!!!!!
movq    %rax, %rcx
jne    .LBB0_1
ret

icc 13.0.1[汇编程序]

movl      %edx, %ecx
movl      (%rsi), %r8d
movl      %r8d, %eax
lock
cmpxchg   %ecx, (%rdi)
movl      %eax, (%rsi) !!!!!!!!!!!!!!!!!!!!!!!
cmpl      %eax, %r8d !!!!!!!!!!!!!!!!!!!!!!!
je        ..B1.7
..B1.4:
movl      %edx, %ecx
movl      %eax, %r8d
lock
cmpxchg   %ecx, (%rdi)
movl      %eax, (%rsi) !!!!!!!!!!!!!!!!!!!!!!!
cmpl      %eax, %r8d !!!!!!!!!!!!!!!!!!!!!!!
jne       ..B1.4
..B1.7:
ret

Visual Studio 2012[无需检查汇编程序,MS使用_InterlockedCompareExchange!!]

inline int _Compare_exchange_seq_cst_4(volatile _Uint4_t *_Tgt, _Uint4_t *_Exp, _Uint4_t _Value)
{    /* compare and exchange values atomically with
sequentially consistent memory order */
int _Res;
_Uint4_t _Prev = _InterlockedCompareExchange((volatile long
*)_Tgt, _Value, *_Exp);
if (_Prev == *_Exp) !!!!!!!!!!!!!!!!!!!!!!!
_Res = 1;
else
{ /* copy old value */
_Res = 0;
*_Exp = _Prev;
}
return (_Res);
}

[…]

打破CAS循环,如Action的清单7.2中的并发:

while(!head.compare_exchange_weak(new_node->next, new_node);

规范(29.6.5[atomics.types.operations.req]/21-22)似乎暗示比较的结果必须是原子的一部分操作:

[…]

此代码和规范的问题不在于compare_exchange的原子性是否需要扩展到仅比较和交换本身之外,以返回比较结果或分配给expected参数。也就是说,在对expected的存储不是原子的情况下,代码可能仍然是正确的。

导致上述代码潜在racy的原因是,在其他线程可能已经观察到成功的交换之后,当实现写入expected参数时。编写代码时,期望在交换成功的情况下,不会在expected上写入以产生竞争。

正如所写的,规范似乎确实保证了这种预期的行为。(事实上,可以理解为你所描述的更有力的保证,即整个操作是原子的。)根据规范,compare_exchange_weak:

从原子角度比较对象指向的内存内容或以此表示与预期相等,如果为真,则替换由对象或由this与that指向的内存内容in,如果为false,则更新中的内存内容预期由对象或这[n4140§29.6.5/21](注意:C++11和C++14之间的措辞不变)

问题是,标准的实际语言似乎比提案的初衷更强。Herb Sutter表示,从未真正支持操作中的并发的使用,更新expected只是针对局部变量。

我目前没有看到任何关于这方面的缺陷报告[请参阅下面的第二次更新]如果事实上这种语言比预期的更强,那么可能会有一种语言被提交。C++11的措辞将被更新以保证上述代码的预期行为,从而使当前实现不一致,或者新的措辞将不能保证这种行为,使上述代码可能导致未定义的行为。在这种情况下,我想安东尼的书需要更新。委员会将对此做些什么,以及实际实现是否符合最初的意图(而不是规范的实际措辞)仍然是一个悬而未决的问题[参见下面的更新]

为了同时编写代码,您必须考虑实现的实际行为,无论它是否一致。现有的实现可能存在"缺陷",因为它们没有实现ISO规范的确切措辞,但它们确实按照实现者的意图运行,并且可以用来编写线程安全代码[参见下面的更新]

因此,直接回答您的问题:

但它实际上是可实现的吗?

我认为规范的实际措辞不可合理实施(实际的措辞使保证比Anthony的just::thread库提供的更有力。例如,实际的措辞似乎需要对非原子对象进行原子操作。Anthony稍微弱一点的解释,即expected的赋值不需要是原子的,但必须以交换失败为条件,显然是可以实现的。Herb更弱的解释ation显然也是可实现的,因为这是大多数库实际实现的[参见下面的更新]

std::atomic_compare_exchange_weak线程在设计上是否不安全?

无论该操作的保证力度与规范的实际措辞一样大,还是与Herb Sutter所指出的一样弱,该操作都不是线程不安全的。简单地说,操作的正确、线程安全使用取决于所保证的内容。操作中的并发中的示例代码是对compare_exchange的不安全使用,它只提供了Herb的弱保证,但它可以被编写为与Herb的实现一起正确工作。可以这样做:

node *expected_head = head.load();
while(!head.compare_exchange_weak(expected_head, new_node) {
new_node->next = expected_head;
}

通过此更改,对expected的"伪"写入仅对局部变量进行,不再产生任何种族。对new_node->next的写入现在以交换失败为条件,因此new_node->next对任何其他线程都不可见,并且可以安全地更新。这个代码示例在当前实现和更强的保证下都是安全的,因此它应该能够经得起未来对C++11原子的任何更新的考验,从而解决这个问题。


更新:

实际实现(至少MSVC、gcc和clang)已经更新,以提供Anthony Williams解释下的保证;也就是说,他们已经停止发明在交换成功的情况下写入expected

https://llvm.org/bugs/show_bug.cgi?id=18899

https://gcc.gnu.org/bugzilla/show_bug.cgi?id=60272

https://connect.microsoft.com/VisualStudio/feedback/details/819819/std-atomic-compare-exchange-weak-has-spurious-write-which-can-cause-race-conditions

更新2:

此问题的缺陷报告已提交给C++委员会。从目前提出的决议来看,委员会确实希望做出比您检查的实现更有力的保证(但没有目前的措辞那么有力,因为目前的措辞似乎保证了对非原子对象的原子操作。)下一个C++标准(C++1z或"C++17")的草案尚未采用改进后的措辞。

更新情况3:C++17通过了拟议的决议。

那些人似乎既不理解标准,也不理解说明。

首先,std::atomic_compare_exchange_weak在设计上是而不是线程不安全的。那完全是胡说八道。该设计非常清楚地定义了函数的作用以及它必须提供的保证(包括原子性和内存排序)
使用此函数的程序整体上是否是线程安全的是另一回事,但函数的语义本身在原子共价交换的意义上肯定是正确的(您仍然可以使用任何可用的线程安全原语编写线程不安全的代码,但那是完全不同的情况)。

这个特殊的函数实现了线程安全比较交换操作的"弱"版本,它与"非弱"版本的不同之处在于,如果这能带来性能优势(与x86无关),则允许实现生成可能会错误失败的代码。弱并不意味着情况更糟,它只意味着允许在某些平台上更频繁地失败,如果这能带来整体性能优势的话
当然,实现仍然需要正确工作。也就是说,如果比较交换失败——无论是并发还是错误的——它必须被正确地报告为失败。

其次,现有实现生成的代码与std::atomic_compare_exchange_weak的正确性或线程安全性无关。充其量,如果生成的指令不能正确工作,这是一个实现问题,但与语言构造无关。语言标准定义了实现必须提供的行为,它不负责实现是否正确执行。

第三,生成的代码没有问题。x86CMPXCHG指令具有定义明确的操作模式。它将实际值与预期值进行比较,如果比较成功,则执行交换。通过查看EAX(或x64中的RAX)或ZF的状态,可以知道操作是否成功
重要的是,原子比较交换是原子的,事实就是这样。无论你在之后对结果做什么,都不需要是原子的(在你的情况下,是CMP),因为状态不再改变。要么当时交换成功,要么已经失败。无论哪种情况,都已经是"历史"了。

std::atomic_compare_exchange_weak与底层指令具有不同的语义,它返回一个bool值。因此,不能总是期望1:1映射到指令。编译器可能必须生成额外的指令(以及不同的指令,具体取决于您使用结果的方式)来实现这些语义,但它确实对正确性没有影响。

唯一可以抱怨的是,它没有直接使用ZF(与JccCMOVcc)的现有状态,而是执行另一个比较。但这是一个性能问题(浪费了1个周期),而不是正确性问题。

从链接页面引用Duncan Forster:

需要记住的重要一点是,CAS的硬件实现只返回1个值(旧值),而不是两个值(老加布尔值)

所以有一条指令-(原子)CAS-它实际上对内存进行操作,然后是另一条将(原子分配的)结果转换为预期布尔值的指令。

由于%rax中的值是原子设置的,因此不会受到另一个线程的影响,因此这里没有竞争。

报价无论如何都是假的,因为ZF也是根据CAS结果设置的(即返回旧值和布尔值)。没有使用该标志的事实可能是错过了优化,或者cmpq可能更快,但它不会影响正确性。


为了参考,考虑像以下伪代码一样分解compare_exchange_weak

T compare_exchange_weak_value(atomic<T> *obj, T *expected, T desired) {
// setup ...
lock cmpxchgq   %rcx, (%rsp) // actual CAS
return %rax; // actual destination value
}
bool compare_exchange_weak_bool(atomic<T> *obj, T *expected, T desired) {
// CAS is atomic
T actual = compare_exchange_weak_value(obj, expected, desired);
// now we figure out if it worked
return actual == *expected;
}

你同意CAS是原子弹吗?


如果期望的无条件存储真的是你想要问的(而不是完全安全的比较),我同意Sebastian的观点,这是一个错误。

作为参考,您可以通过将无条件存储强制到本地,并使潜在的可见存储再次成为条件存储来解决此问题:

struct node {
int data;
node* next;
};
std::atomic<node*> head;
void push(int data) {
node* new_node = new node{data};
node* cur_head = head.load(std::memory_order_relaxed);
do {
new_node->next = cur_head;
} while (!head.compare_exchange_weak(cur_head, new_node,
std::memory_order_release, std::memory_order_relaxed));
}