无锁堆栈-这是c++11宽松原子的正确用法吗?它能被证明吗

Lock-free stack - Is this a correct usage of c++11 relaxed atomics? Can it be proven?

本文关键字:用法 证明 原子的 堆栈 这是 c++11      更新时间:2023-10-16

我为一个非常简单的数据段编写了一个容器,该数据段需要跨线程同步。我想要最好的表演。我不想用锁

我想使用"放松的"原子。一方面是因为那一点点额外的魅力,另一方面是为了真正理解它们

我已经在这方面做了很多工作,我已经到了这个代码通过所有测试的地步。不过这还不是很"证明",所以我想知道我是否遗漏了什么,或者我可以用其他方法来测试它?

这是我的前提:

  • 重要的是,节点被正确地推送和弹出,并且堆栈永远不会无效
  • 我相信记忆中的操作顺序只有一个地方很重要:
    • compare_exchange操作本身之间<这是有保证的,即使是宽松的原子论>
  • "ABA"问题是通过在指针上添加识别号来解决的。在32位系统上,这需要一个双字compare_exchange,而在64位系统中,指针的未使用的16位用id号填充
  • 因此:堆栈将始终处于有效状态 (对吗?(

以下是我的想法。"通常",我们对正在阅读的代码进行推理的方式是查看代码的编写顺序。内存可以被读取或写入"无序",但不能以使程序正确性无效的方式读取或写入。

这在多线程环境中会发生变化。这就是内存围栏的作用——这样我们仍然可以查看代码,并能够推理它将如何工作。

所以,如果这里的一切都可能出了问题,我该怎么处理放松的原子论呢?这不是有点太远了吗?

我不这么认为,但这就是我来这里寻求帮助的原因。

compare_exchange操作本身保证了彼此之间的序列恒定性

对原子进行读取或写入的唯一其他时间是在compare_exchange之前获取头的初始值。它被设置为变量初始化的一部分。据我所知,这一操作是否能带来"适当"的价值是无关紧要的。

当前代码:

struct node
{
    node *n_;
#if PROCESSOR_BITS == 64
    inline constexpr node() : n_{ nullptr }                 { }
    inline constexpr node(node* n) : n_{ n }                { }
    inline void tag(const stack_tag_t t)                    { reinterpret_cast<stack_tag_t*>(this)[3] = t; }
    inline stack_tag_t read_tag()                           { return reinterpret_cast<stack_tag_t*>(this)[3]; }
    inline void clear_pointer()                             { tag(0); }
#elif PROCESSOR_BITS == 32
    stack_tag_t t_;
    inline constexpr node() : n_{ nullptr }, t_{ 0 }        { }
    inline constexpr node(node* n) : n_{ n }, t_{ 0 }       { }
    inline void tag(const stack_tag_t t)                    { t_ = t; }
    inline stack_tag_t read_tag()                           { return t_; }
    inline void clear_pointer()                             { }
#endif
    inline void set(node* n, const stack_tag_t t)           { n_ = n; tag(t); }
};
using std::memory_order_relaxed;
class stack
{
public:
    constexpr stack() : head_{}{}
    void push(node* n)
    {
        node next{n}, head{head_.load(memory_order_relaxed)};
        do
        {
            n->n_ = head.n_;
            next.tag(head.read_tag() + 1);
        } while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));
    }
    bool pop(node*& n)
    {
        node clean, next, head{head_.load(memory_order_relaxed)};
        do
        {
            clean.set(head.n_, 0);
            if (!clean.n_)
                return false;
            next.set(clean.n_->n_, head.read_tag() + 1);
        } while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));
        n = clean.n_;
        return true;
    }
protected:
    std::atomic<node> head_;
};

这个问题与其他问题相比有什么不同?放松的原子。他们对这个问题有很大的不同。

那么,你觉得呢?我缺什么了吗?

push已损坏,因为在compareAndSwap失败后不会更新node->_next。当下一次compareAndSwap尝试成功时,您最初与node->setNext一起存储的节点可能已被另一个线程从堆栈顶部弹出。因此,一些线程认为它已经从堆栈中弹出了一个节点,但这个线程已经将其放回了堆栈中。应该是:

void push(Node* node) noexcept
{
    Node* n = _head.next();
    do {
        node->setNext(n);
    } while (!_head.compareAndSwap(n, node));
}

此外,由于nextsetNext使用memory_order_relaxed,因此不能保证这里的_head_.next()返回最近推送的节点。有可能从堆栈顶部泄漏节点。同样的问题显然也存在于pop中:_head.next()可能会返回一个以前但不再位于堆栈顶部的节点。如果返回的值是nullptr,那么当堆栈实际上不是空的时,可能无法弹出。

如果两个线程试图同时从堆栈中弹出最后一个节点,pop也可能具有未定义的行为。它们都看到了_head.next()的相同值,一个线程成功地完成了pop。另一个线程进入while循环——因为观察到的节点指针不是nullptr——但compareAndSwap循环很快将其更新为nullptr,因为堆栈现在是空的。在循环的下一次迭代中,该nullptr被取消引用以获得其_next指针,随之而来的是很多欢乐。

CCD_ 19也明显患有ABA。两个线程可以在堆栈的顶部看到相同的节点。假设一个线程到达评估_next指针的点,然后阻塞。另一个线程成功地弹出了该节点,推送了5个新节点,然后在其他线程唤醒之前再次推送该原始节点。另一个线程的compareAndSwap将成功——栈顶节点相同——但将旧的_next值存储到_head中,而不是新的值。其他线程推送的五个节点全部泄漏。memory_order_seq_cst也是如此。

抛开pop操作的实现难度不谈,我认为memory_order_relaxed是不够的。在推送节点之前,假设将向其中写入一些值,这些值将在弹出节点时读取。您需要一些同步机制来确保在读取值之前实际上已经写入了值。memory_order_relaxed没有提供同步。。。CCD_ 27/CCD_。

此代码已完全损坏。

这似乎起作用的唯一原因是,当前的编译器在跨原子操作的重新排序方面不是很积极,x86处理器有很强的保证。

第一个问题是,如果没有同步,就无法保证此数据结构的客户端甚至会看到要初始化的节点对象的字段。下一个问题是,在没有同步的情况下,推送操作可以读取头的标签的任意旧值。

我们开发了一个工具CDSChecker,它可以模拟内存模型允许的大多数行为。它是开源和免费的。在数据结构上运行它,可以看到一些有趣的执行。

在这一点上,证明任何利用宽松原子论的代码都是一个巨大的挑战。大多数证明方法都会失败,因为它们本质上是典型的归纳法,而且你没有归纳的顺序。所以你可以凭空阅读问题。。。