无锁原子状态类 - 是否正确

Lock free atomic state class - is it correct?

本文关键字:是否 状态      更新时间:2023-10-16

我只是在寻找关于我尝试在结构上实现原子读/写的反馈(明显的缺陷/改进方法)。

将有一个写入线程和多个读取线程。目的是防止读者对结构的看法不一致,同时又不会过多地阻碍作者。

我正在使用获取和添加原子原语,在这种情况下由Qt框架提供。

例如:

/* global */
OneWriterAtomicState<Point> atomicState;
/* Writer */
while(true) {  
  MyStruct s = atomicState.getState()
  s.x += 2; s.y += 2;
  atomicState.setState(s);
}
/* Reader */
while(true) {  
  MyStruct s = atomicState.getState()
  drawBox(s.x,s.y);
}

OneWriterAtomicState Implementation:

template <class T>
class OneWriterAtomicState
{
public:
    OneWriterAtomicState()
        : seqNumber(0)
    {
    }
    void setState(T& state) {
        this->seqNumber.fetchAndAddOrdered(1);
        this->state = state;
        this->seqNumber.fetchAndAddOrdered(1);
    }
    T getState(){
        T result;
        int seq;
        bool seq_changed = true;
        /* repeat while seq is ODD or if seq changes during read operation */
        while( (seq=this->seqNumber.fetchAndAddOrdered(0)) & 0x01 || seq_changed ) {
            result = this->state;
            seq_changed = (this->seqNumber.fetchAndAddOrdered(0)!=seq);
        }
        return result;
    }

private:
    QAtomicInt seqNumber;
    T state;
} 

这是版本二(memcpy,读者产生,希望修复getState():

template <class T>
class OneWriterAtomicState
{
public:
    OneWriterAtomicState()
        : seqNumber(0)
    {
        /* Force a compile-time error if T is NOT a type we can copy with memcpy */
        Q_STATIC_ASSERT(!QTypeInfo<T>::isStatic);
    }
    void setState(T* state) {
        this->seqNumber.fetchAndAddOrdered(1);
        memcpy(&this->state,state,sizeof(T));
        this->seqNumber.fetchAndAddOrdered(1);
    }
    void getState(T* result){
        int seq_before;
        int seq_after  = this->seqNumber.fetchAndAddOrdered(0);
        bool seq_changed = true;
        bool firstIteration = true;
        /* repeat while seq_before is ODD or if seq changes during read operation */
        while( ((seq_before=seq_after) & 0x01) || seq_changed ) {
            /* Dont want to yield on first attempt */
            if(!firstIteration) {
                /* Give the writer a chance to finish */
                QThread::yieldCurrentThread();
            } else firstIteration = false;
            memcpy(result,&this->state,sizeof(T));
            seq_after = this->seqNumber.fetchAndAddOrdered(0);
            seq_changed = (seq_before!=seq_after);
        }
    }
    bool isInitialized() {  return (seqNumber>0); }
private:
    QAtomicInt seqNumber;
    T state;
} ;
#endif // ONEWRITERATOMICSTATE_H

算法不太正确。下面是读取器获取不一致数据的一种可能的线程交错:

state initialized to {0,0} and seqNumber to 0
Writer:
seqNumber = 1;
state.x = 1;
Reader:
seq = seqNumber; //1
result = state; //{1,0}
seq_changed = (seqNumber != seq); //false
Writer:
state.y = 1;
seqNumber = 2;
Reader:
jumps back to the start of the loop
seq = seqNumber; //2
steps out of the loop because seq == 2 and seq_changed == false

因此,问题在于seqNumber是在两个位置读取的,并且编写器可以在读取之间更新值。

while( (seq=this->seqNumber.fetchAndAddOrdered(0)) & 0x01 || seq_changed ) {
    result = this->state;
    seq_changed = (this->seqNumber.fetchAndAddOrdered(0)!=seq);
    //If writer updates seqNumber here to even number bad things may happen
}

每次迭代只应读取一次:

T getState(){
    T result;
    int seq;
    int newseq = seqNumber.fetchAndAddOrdered(0);
    bool seq_changed = true;
    while( (seq = newseq) & 0x01 || seq_changed ) {
        result = state;
        newseq = seqNumber.fetchAndAddOrdered(0);
        seq_changed = (newseq != seq);
    }
    return result;
}

我相信这应该可以正常工作,但我不会保证任何事情。 :)至少你应该编写一个测试程序,就像你的例子一样,但在阅读器中添加一个不一致值的检查。

值得考虑的一件事是,使用原子增量(fetchAndAdd)是一种矫枉过正。只有一个线程写入seqNumber,所以你可以做简单的原子存储释放和加载获取操作,它们可以在许多处理器上更快地实现。但是我不知道这些操作是否可以使用 QAtomicInt ;文档对此非常不清楚。

编辑:Wilx是对的,T需要是一个微不足道的可复制类型

我认为这只有在T的复制赋值运算符是原始的并且基本上只执行按位复制时才有效。对于任何更复杂的T,您最终可能会在执行result = this->state;期间获得不一致的状态。

因此,我建议使用某种具有作家偏好的 rwlocks。

如果您有基于优先级的线程调度,并且读取器的优先级高于编写器,则可能会遇到活锁。想象一下,作者开始写入值,然后读者进来做积极的等待。由于读者的优先级较高,作者将永远没有机会完成写作。

一个

解决方案是在等待循环中添加一个微小的延迟。