获取std::atomic以正确保护数据时出现问题

Trouble getting std::atomic to protect data correctly

本文关键字:数据 问题 保护 确保 std atomic 获取      更新时间:2023-10-16

为什么std::原子版本的代码仍然失败?(当refCount为非零,doStop(曾)为false时,回调将更改。

我有一段多线程的代码,它的行为不正确,并试图修复它

然而,我的修复仍然不可靠,但我不明白为什么。

原始代码线程A(使用回调):-

 if( !IsUpdating ) {
     IncrementReference();
     if( !IsUpdating && GetCallBackPointer() ) {
         cb = GetCallBackPointer();
         cb();
     }
     DecrementReference();
 }

原始代码线程B-(修改回调)

 IsUpdating = true;
 while( ReferencesUsingCallback ) {
     Sleep( 10 );
 }
 callback = newValue;
IsUpdating = false;

其想法是,如果ReferencesUsingCallback不为0,那么修改回调线程将不允许更改回调的值。

通过测试、AddRef和测试,可以对比赛条件进行"保护"。希望测试不会再次失败。

不幸的是,代码没有工作,我认为这是由于一些缓存一致性问题。

此后,我使用std::atomic再次尝试交付测试用例,但它仍然可能失败。std::atomic版本是"AtomicLockedData"。该平台是英特尔i7 上的Windows

完整代码:-

#include <thread>
#include <mutex>
#include <atomic>
#include <chrono>
#define FAILED_LIMIT 5
#define LOOP_SIZE 1000000000LL
void Function()
{
}
typedef void (*CallbackFunction)(void);

int FailedCount;
__int64 counter = 0;
class lockedData {
public:
    lockedData() : value(nullptr), value2(nullptr) 
    { 
        doStop = 0;
        usageCount = 0;
    }
    long usageCount;
    long doStop;
    volatile CallbackFunction value;
    void * value2;
    int Use()
    {
        return usageCount++;
    }
    int UnUse()
    {
        return usageCount--;
    }
    int Usage() const
    {
        return usageCount;
    }
    void SetStop()
    {
        doStop = 1;
    }
    void UnStop()
    {
        doStop = 0;
    }
    bool IsStopped()
    {
        return doStop != 0;
    }
    void StoreData(CallbackFunction pData )
    {
        value = pData;
    }
    CallbackFunction ReadData()
    {
        return value;
    }
};

class AtomicLockedData {
public:
    AtomicLockedData() : value(nullptr), value2(nullptr)
    {
        doStop = false;
        usageCount = 0;
    }
    std::atomic<int> usageCount;
    std::atomic<bool> doStop;
    std::atomic<CallbackFunction> value;
    void * value2;
    int Use()
    {
        return usageCount++;
    }
    int UnUse()
    {
        return usageCount--;
    }
    int Usage() const
    {
        return usageCount.load();
    }
    void SetStop()
    {
        doStop.store( true);
    }
    void UnStop()
    {
        doStop.store( false );
    }
    bool IsStopped()
    {
        return doStop.load() == true;
    }
    void StoreData(CallbackFunction pData)
    {
        value.store( pData );
    }
    CallbackFunction ReadData()
    {
        return value.load();
    }
};

template < class lockData >
int UpdateState( lockData & aLock, CallbackFunction pData, void * pData2 )
{
    aLock.SetStop();
    while(aLock.Usage() > 0 )
       std::this_thread::sleep_for( std::chrono::milliseconds(10) );
    aLock.value = pData;
    aLock.UnStop();
    return 0;
}
template <class lockData >
int ReadState( lockData * aLock, int fib)
{
    if (!aLock->IsStopped()) {
        aLock->Use();
        CallbackFunction val = aLock->ReadData();
        if (!aLock->IsStopped() && val) {
            fibonacci(fib);
            CallbackFunction pTest = const_cast<CallbackFunction>( aLock->ReadData());
            if (pTest == 0) {
                FailedCount++; // shouldn't be able to change value if use count is non-zero
                printf("Failedn");
            }
            else {
                pTest();
            }
        }
        aLock->UnUse();
    }
    return 0;
}
unsigned __int64 fibonacci(size_t n)
{
    if (n < 3) return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}
template< class lockData > void ThreadA( lockData * lkData , int fib )
{
    void * pData2 = new char[200];
    while (FailedCount < FAILED_LIMIT) {
        UpdateState< lockData>(*lkData,  Function, pData2);
        fibonacci(fib);
        UpdateState< lockData>(*lkData, NULL, NULL);
        fibonacci(fib);
    }
}
template< class lockData > void ThreadB(lockData & lkData, int fib )
{
    while (FailedCount < FAILED_LIMIT && counter < LOOP_SIZE) {
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        ReadState(&lkData, fib);
        counter++;
    }
}
template <class lockType >
void TestLock()
{
    counter = 0;
    FailedCount = 0;
    lockType lk;
    std::thread thr(ThreadA<lockType>, &lk, 3);
    ThreadB(lk, 3);
    thr.join();
    printf("Failed %d times for %I64d iterations", FailedCount, counter);
}
int main(int argc, char ** argv)
{
    TestLock< lockedData >();
    TestLock< AtomicLockedData >();
    return 0;
}

if (!aLock->IsStopped()) {
    aLock->Use();

看起来很奇怪。

IsStopped()返回false之后,在调用Use()之前,状态可能会移动到已停止(因此,您可能会将Use()作为已停止的锁)。

解决方案是让返回值Use在禁止操作的情况下通信失败,而不是在Use()之后进行检查。

希望测试不会再次失败。

它肯定可以,如果你有足够的测试,。众所周知,双重检查锁定是不安全的。

原子论的用法本身并不是原子的。因此,您的操作不是原子操作。

或者换句话说,原子性是而不是像const那样,它不会隐式传播。你不能简单地通过使用原子变量来编写一个安全的操作。您必须编写一个完全原子化的操作,也必须简单地在后台使用原子变量。

如果您无法编写基于原子基元的原子算法,则必须使用互斥使其成为原子。

此外,您的非原子代码不仅在并发性方面不安全,而且在未定义的行为方面也不安全,因为存在数据竞争。这也是未定义的行为,因为你使用的变量是非易失性的,所以编译器可以假设它们不会在外部发生变化,编译器可以根据这一事实进行优化。立即抛出此代码;它无法使用。

感谢您的其他回答,但他们似乎没有回答问题。

发布的代码有缺陷,因为它在第二次检查之前读取了回调函数的值。

已更正ReadState

template <class lockData >
int ReadState( lockData * aLock, int fib)
{
    if (!aLock->IsStopped()) {
        aLock->Use();
        CallbackFunction val;
        if (!aLock->IsStopped() && (val = aLock->ReadData() ) ) {
            fibonacci(fib);
            CallbackFunction pTest = const_cast<CallbackFunction>( aLock->ReadData());
            if (pTest == 0) {
                FailedCount++; // shouldn't be able to change value if use count is non-zero
                printf("Failedn");
            }
            else {
                pTest();
            }
        }
        aLock->UnUse();
    }
    return 0;
}

此代码的原子版本不会失败,但非原子版本会失败。将volatile添加到非原子版本(修复数据竞赛?)没有帮助。

该代码旨在确保读取在当前正在更新时不会使用回调值。人们希望这样做是为了去掉锁,支持ReadState解决方案,因此添加锁本来是有效的,但毫无意义。

在执行ReadState之前,我们会检查更新是否未运行。我不确定这是否有帮助。

在递增使用计数之后,检查IsStopped。这确保了UpdateState将被阻止进行进一步操作,直到使用率为0。

因此,剩下的竞争是在UpdateState测试了使用计数之后,ReadState调用increment。

修复方法是确保在检查IsStopped之后读取val。正确的方法是,如果UpdateState线程错过了增量,并且仍在执行,那么ReadState将挂起并稍后重试。否则,我们知道IsStopped是false,并且UpdateState在用法为0之前不会进行更改,然后我们可以读取该值,它不会被更改。

IsStopped之前读取val会产生问题,在读取和第二次测试(pTest)之间可能会更改值,并且IsStopped设置为0,从而导致失败。