并发队列中的覆盖

Overwriting in a concurrent queue

本文关键字:覆盖 队列 并发      更新时间:2023-10-16

我正在尝试编写一个无互斥(但不是无锁)队列,该队列使用连续的内存范围作为循环缓冲区和四个指针:两个用于消费者,两个用于生产者。它在最近推入的元素后面保留一个空格,以消除已满队列和空队列之间的歧义。下面是实现:

template <typename T, typename Allocator = std::allocator<T>>
class concurrent_queue
{
protected:
    T *storage;
    std::size_t s;
    std::atomic<T*> consumer_head, producer_head;
    union alignas(16) dpointer
    {
        struct
        {
            T *ptr;
            std::size_t cnt;
        };
        __int128 val;
    };
    dpointer consumer_pending, producer_pending;
    Allocator alloc;
public:
    concurrent_queue(std::size_t s): storage(nullptr), consumer_head(nullptr), producer_head(nullptr)
    {
        storage = alloc.allocate(s+1);
        consumer_head = storage;
        __atomic_store_n(&(consumer_pending.val), (dpointer{storage, 0}).val, __ATOMIC_SEQ_CST);
        producer_head = storage;
        __atomic_store_n(&(producer_pending.val), (dpointer{storage, 0}).val, __ATOMIC_SEQ_CST);
        this->s = s + 1;
    }
    ~concurrent_queue()
    {
        while(consumer_head != producer_head)
        {
            alloc.destroy(consumer_head.load());
            ++consumer_head;
            if(consumer_head == storage + s)
                consumer_head = storage;
        }
        alloc.deallocate(storage, s);
    }
    template <typename U>
    bool push(U&& e)
    {
        while(true)
        {
            dpointer a;
            a.val = __atomic_load_n(&(producer_pending.val), __ATOMIC_RELAXED);
            std::atomic_thread_fence(std::memory_order_acquire);
            auto b = consumer_head.load(std::memory_order_relaxed);
            auto next = a.ptr + 1;
            if(next == storage + s) next = storage;
            if(next == b) continue;
            dpointer newval{next, a.cnt+1};
            if(!__atomic_compare_exchange_n(&(producer_pending.val), &(a.val), (newval.val), true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) continue;
            alloc.construct(a.ptr, std::forward<U>(e));
            while(!producer_head.compare_exchange_weak(a.ptr, next, std::memory_order_release, std::memory_order_relaxed));
            return true;
        }
    }
    template <typename U>
    bool pop(U& result)
    {
        while(true)
        {
            dpointer a;
            a.val = __atomic_load_n(&(consumer_pending.val), __ATOMIC_RELAXED);
            std::atomic_thread_fence(std::memory_order_acquire);
            auto b = producer_head.load(std::memory_order_relaxed);
            auto next = a.ptr + 1;
            if(next == storage + s) next = storage;
            if(a.ptr == b) continue;
            dpointer newval{next, a.cnt+1};
            if(!__atomic_compare_exchange_n(&(consumer_pending.val), &(a.val), (newval.val), true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) continue;
            result = std::move(*(a.ptr));
            alloc.destroy(a.ptr);
            while(!consumer_head.compare_exchange_weak(a.ptr, next, std::memory_order_release, std::memory_order_relaxed));
            return true;
        }
    }
};

然而,当使用相同数量的单独推入和弹出线程进行测试时,每个推入/弹出在终止之前都有相同的预定数量的元素,一些弹出线程有时(并非总是)在执行的某个时刻卡在第一个CAS上,并且永远不会终止,即使在所有推入线程终止之后。由于它们尝试弹出与推送线程推送相同数量的元素,我怀疑推送线程在某个时刻发生了覆盖。

这是我第一次尝试编写并发容器,所以我对此非常缺乏经验…我盯着这个已经有一段时间了,还没能弄清楚是哪里出了问题。有更有经验的人能解决这个问题吗?

此外,是否有更少的平台特定的方式来获得双宽度CAS?

编辑:这篇文章的大部分内容实际上是假的。请看注释

        dpointer a;
        a.val = __atomic_load_n(&(producer_pending.val), __ATOMIC_RELAXED);
        std::atomic_thread_fence(std::memory_order_acquire);
        auto b = consumer_head.load(std::memory_order_relaxed);

你绝对确定这是你想的那样吗?此代码段在b之前排序a.val。

std:: atomic_thread_fence (std:: memory_order_acquire);保证在 fence之后的内存读操作不会在 fence之前的重新排序。但是没有什么能阻止栅栏上的内存操作流向底层。编译器可以完全自由地向上移动获取栅栏,只要它不与其他栅栏重新排序。

更抽象:

a = load relaxed
memory fence acquire -- memory operations below this line may not float upwards
b = load relaxed

编译器可以将其转换为:

memory fence acquire
b = load relaxed
a = load relaxed

但不是这个:

a = load relaxed
b = load relaxed
memory fence acquire

此外,您应该真正避免内存围栏,并在操作本身上添加获取/释放。这通常会为非x86目标生成更好的代码。对于x86来说,这并不重要,因为即使是普通的mov也足以在各种情况下提供顺序一致性。