并发队列中的覆盖
Overwriting in a concurrent queue
我正在尝试编写一个无互斥(但不是无锁)队列,该队列使用连续的内存范围作为循环缓冲区和四个指针:两个用于消费者,两个用于生产者。它在最近推入的元素后面保留一个空格,以消除已满队列和空队列之间的歧义。下面是实现:
template <typename T, typename Allocator = std::allocator<T>>
class concurrent_queue
{
protected:
T *storage;
std::size_t s;
std::atomic<T*> consumer_head, producer_head;
union alignas(16) dpointer
{
struct
{
T *ptr;
std::size_t cnt;
};
__int128 val;
};
dpointer consumer_pending, producer_pending;
Allocator alloc;
public:
concurrent_queue(std::size_t s): storage(nullptr), consumer_head(nullptr), producer_head(nullptr)
{
storage = alloc.allocate(s+1);
consumer_head = storage;
__atomic_store_n(&(consumer_pending.val), (dpointer{storage, 0}).val, __ATOMIC_SEQ_CST);
producer_head = storage;
__atomic_store_n(&(producer_pending.val), (dpointer{storage, 0}).val, __ATOMIC_SEQ_CST);
this->s = s + 1;
}
~concurrent_queue()
{
while(consumer_head != producer_head)
{
alloc.destroy(consumer_head.load());
++consumer_head;
if(consumer_head == storage + s)
consumer_head = storage;
}
alloc.deallocate(storage, s);
}
template <typename U>
bool push(U&& e)
{
while(true)
{
dpointer a;
a.val = __atomic_load_n(&(producer_pending.val), __ATOMIC_RELAXED);
std::atomic_thread_fence(std::memory_order_acquire);
auto b = consumer_head.load(std::memory_order_relaxed);
auto next = a.ptr + 1;
if(next == storage + s) next = storage;
if(next == b) continue;
dpointer newval{next, a.cnt+1};
if(!__atomic_compare_exchange_n(&(producer_pending.val), &(a.val), (newval.val), true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) continue;
alloc.construct(a.ptr, std::forward<U>(e));
while(!producer_head.compare_exchange_weak(a.ptr, next, std::memory_order_release, std::memory_order_relaxed));
return true;
}
}
template <typename U>
bool pop(U& result)
{
while(true)
{
dpointer a;
a.val = __atomic_load_n(&(consumer_pending.val), __ATOMIC_RELAXED);
std::atomic_thread_fence(std::memory_order_acquire);
auto b = producer_head.load(std::memory_order_relaxed);
auto next = a.ptr + 1;
if(next == storage + s) next = storage;
if(a.ptr == b) continue;
dpointer newval{next, a.cnt+1};
if(!__atomic_compare_exchange_n(&(consumer_pending.val), &(a.val), (newval.val), true, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) continue;
result = std::move(*(a.ptr));
alloc.destroy(a.ptr);
while(!consumer_head.compare_exchange_weak(a.ptr, next, std::memory_order_release, std::memory_order_relaxed));
return true;
}
}
};
然而,当使用相同数量的单独推入和弹出线程进行测试时,每个推入/弹出在终止之前都有相同的预定数量的元素,一些弹出线程有时(并非总是)在执行的某个时刻卡在第一个CAS上,并且永远不会终止,即使在所有推入线程终止之后。由于它们尝试弹出与推送线程推送相同数量的元素,我怀疑推送线程在某个时刻发生了覆盖。
这是我第一次尝试编写并发容器,所以我对此非常缺乏经验…我盯着这个已经有一段时间了,还没能弄清楚是哪里出了问题。有更有经验的人能解决这个问题吗?
此外,是否有更少的平台特定的方式来获得双宽度CAS?
编辑:这篇文章的大部分内容实际上是假的。请看注释
dpointer a;
a.val = __atomic_load_n(&(producer_pending.val), __ATOMIC_RELAXED);
std::atomic_thread_fence(std::memory_order_acquire);
auto b = consumer_head.load(std::memory_order_relaxed);
你绝对确定这是你想的那样吗?此代码段不在b之前排序a.val。
std:: atomic_thread_fence (std:: memory_order_acquire);保证在 fence之后的内存读操作不会在 fence之前的重新排序。但是没有什么能阻止栅栏上的内存操作流向底层。编译器可以完全自由地向上移动获取栅栏,只要它不与其他栅栏重新排序。
更抽象:
a = load relaxed
memory fence acquire -- memory operations below this line may not float upwards
b = load relaxed
编译器可以将其转换为:
memory fence acquire
b = load relaxed
a = load relaxed
但不是这个:
a = load relaxed
b = load relaxed
memory fence acquire
此外,您应该真正避免内存围栏,并在操作本身上添加获取/释放。这通常会为非x86目标生成更好的代码。对于x86来说,这并不重要,因为即使是普通的mov
也足以在各种情况下提供顺序一致性。
相关文章:
- boost::进程间消息队列引发错误
- 如果我只是不访问queue_front节点的子节点,而是将它们推到队列中呢?还是BFS吗
- Android NDK传感器向事件队列报告奇怪的间隔
- C++优先级队列,按对象的唯一指针的特定方法升序排列
- 使用ios:ate写入到流会覆盖现有文件
- 按对象的特定方法按升序排列的C++优先级队列
- 使用2个键的cpp-stl::优先级队列排序不正确
- 我是否需要在下一次转移时将所有权*转移回转移队列
- 我可以重新分配/覆盖std::字符串吗
- 在一个读写器队列中,我可以用volatile替换原子吗
- 为什么我的多线程作业队列崩溃
- 尝试将lambda函数放在队列中时出现一般分配器错误(可能是与unique_ptr有关的错误)
- 叮叮当当在修复时插入多个"覆盖"说明符
- 谷歌模拟和覆盖关键字
- 是否有一种有效的方法来搜索队列中的关键字并覆盖其值
- 为什么我不能简单地覆盖<对的优先级队列?
- 优先级队列中复制构造函数覆盖的值
- 我如何覆盖队列STL对象中的一些函数
- 并发队列中的覆盖
- 一旦满足指定容量,覆盖优先级队列末尾的元素