C++无锁队列与多个线程一起崩溃

C++ lockless queue crashes with multiple threads

本文关键字:线程 一起 崩溃 队列 C++      更新时间:2023-10-16

在为多个线程编码时,我试图更好地理解控制内存顺序。我过去曾多次使用互斥锁来序列化变量访问,但为了提高性能,我正在尽可能避免使用互斥锁。

我有一个指针队列,它可能被许多线程填充,并被许多线程消耗。它可以很好地处理单个线程,但当我使用多个线程运行时会崩溃。看起来消费者可能会得到指针的重复,这会导致它们被释放两次。这有点难说,因为当我输入任何打印报表时,它运行良好,不会崩溃。

首先,我使用一个预先分配的向量来保存指针。我保留了3个原子索引变量来跟踪向量中需要处理的元素。可能值得注意的是,我尝试使用_queue类型,其中元素本身是原子的,这似乎没有帮助。这是一个更简单的版本:

std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;
// Write to _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iwrite.load();
uint32_t inext = (idx+1)%_queue.size();
if( inext == iread.load() ) return kQUEUE_FULL;
if( iwrite.compare_exchange_weak(idx, inext) ){
_queue[idx] = jevent; // jevent is JEvent* passed into this method
while( !_done ){
if( iend.compare_exchange_weak(idx, inext) ) break;
}
break;
}
}

和来自同一类

// Read from _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iread.load();
if(idx == iend.load()) return NULL;
JEvent *Event = _queue[idx];
uint32_t inext = (idx+1)%_queue.size();
if( iread.compare_exchange_weak(idx, inext) ){
_nevents_processed++;
return Event;
}
}

我应该强调的是,我真的很想了解为什么这不起作用。实施一些其他预先制定的包会让我克服这个问题,但不会帮助我避免以后再次犯同样的错误。

更新我将Alexandr Konovalov的回答标记为正确(请参阅下面他的回答中的我的评论)。如果有人看到这个页面,"写入"部分的更正代码是:

std::atomic<uint32_t> iread;
std::atomic<uint32_t> iwrite;
std::atomic<uint32_t> iend;
std::vector<JEvent*> _queue;
// Write to _queue (may be thread 1,2,3,...)
while(!_done){
uint32_t idx = iwrite.load();
uint32_t inext = (idx+1)%_queue.size();
if( inext == iread.load() ) return kQUEUE_FULL;
if( iwrite.compare_exchange_weak(idx, inext) ){
_queue[idx] = jevent; // jevent is JEvent* passed into this method
uint32_t save_idx = idx;
while( !_done ){
if( iend.compare_exchange_weak(idx, inext) ) break;
idx = save_idx;
}
break;
}
}

对我来说,当有2个编写器和1个读取器时,可能会出现一个问题。假设第一个写入程序在之前停止

_queue[0] = jevent;

并且第二写入器通过iend发信号表示其_ queue[1]准备好被读取。然后,reader通过iend看到_queue[0]已经准备好被读取,所以我们有数据竞赛。

我建议你试试Relacy Race Detector,它非常适合这种分析。