导致生产者/消费者堆栈中的竞争状态

Cause of race condition in producer/consumer stack

本文关键字:竞争 状态 堆栈 消费者 生产者      更新时间:2023-10-16

我尝试创建一个基于通知事件的生产者-消费者堆栈,这将允许单个线程推送数据,另一个线程弹出数据。

当缓冲区满或空时,一个线程等待另一个线程,直到它能够继续。

我正在检测一个竞态条件(程序在我标记***ERROR HERE***的地方中断),但我不明白为什么会发生。

在这个程序中size怎么能比capacity高?

#include <process.h>
#include <cstdlib>
#include <vector>
#include <windows.h>
template<typename T, typename Ax = std::allocator<T> >
class rwstack
{
    // It is assumed that only ONE thread will push data
    //   and only ONE thread will pop data.
public:
    typedef T value_type;
    typedef Ax allocator_type;
    typedef rwstack<value_type, allocator_type> this_type;
    typedef std::vector<value_type, allocator_type> container_type;
private:
    allocator_type allocator;
    value_type *items;
    size_t volatile count;
    size_t const capacity;
    HANDLE hEventNotEmpty, hEventNotFull;
    rwstack(const this_type &other) { __debugbreak(); /*Don't allow*/ }
public:
    rwstack(const size_t capacity = 4096)
        : allocator(allocator_type()),
        items(allocator.allocate(capacity, NULL)),
        count(0), capacity(capacity),
        hEventNotEmpty(CreateEvent(NULL, TRUE, FALSE, NULL)),
        hEventNotFull(CreateEvent(NULL, TRUE, TRUE, NULL)) { }
    virtual ~rwstack()  // Not actually used in the example
    {
        CloseHandle(hEventNotEmpty);
        CloseHandle(hEventNotFull);
        for (size_t i = 0; i < count; i++)
        { allocator.destroy(&items[InterlockedDecrementSizeT(&count) - i]); }
        allocator.deallocate(items, capacity);
    }
    value_type &push(const value_type &value)
    {
        const ULONG waitResult = WaitForSingleObject(hEventNotFull, INFINITE);
        if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
        const size_t newSize = InterlockedIncrementSizeT(&count);
        try
        {
            if (newSize > capacity) { __debugbreak(); }  // ****ERROR HERE****
            if (newSize >= capacity) { ResetEvent(hEventNotFull); }
            allocator.construct(&items[newSize - 1], value);
            SetEvent(hEventNotEmpty);
            return items[newSize - 1];
        }
        catch (...) { InterlockedDecrementSizeT(&count); throw; }
    }
    void pop(value_type *pValue = NULL)
    {
        const ULONG waitResult = WaitForSingleObject(hEventNotEmpty, INFINITE);
        if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
        const size_t newSize = InterlockedDecrementSizeT(&count);
        try
        {
            if (newSize > capacity) { __debugbreak(); }  // ****ERROR HERE****
            if (newSize <= 0) { ResetEvent(hEventNotEmpty); }
            if (pValue != NULL) { *pValue = items[newSize]; }
            allocator.destroy(&items[newSize]);
            SetEvent(hEventNotFull);
        }
        catch (...) { InterlockedIncrementSizeT(&count); throw; }
    }
};
static size_t InterlockedIncrementSizeT(size_t volatile *p)
{
#if _M_X64
    return InterlockedIncrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
    return InterlockedIncrement(reinterpret_cast<long volatile *>(p));
#endif
}
static size_t InterlockedDecrementSizeT(size_t volatile *p)
{
#if _M_X64
    return InterlockedDecrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
    return InterlockedDecrement(reinterpret_cast<long volatile *>(p));
#endif
}

测试代码:

typedef rwstack<int> TTestStack;
void __cdecl testPush(void *context)
{
    TTestStack::value_type v;
    for (;;)
        static_cast<TTestStack *>(context)->pop(&v);
}
void __cdecl testPop(void *context)
{
    for (TTestStack::value_type v = 0; ; v++)
        static_cast<TTestStack *>(context)->push(v);
}
int main()
{
    TTestStack rw;
    HANDLE hThreads[2] = {
        reinterpret_cast<HANDLE>(_beginthread(&testPush, 0, &rw)),
        reinterpret_cast<HANDLE>(_beginthread(&testPop,  0, &rw)),
    };
    const ULONG nThreads = sizeof(hThreads) / sizeof(*hThreads)
    WaitForMultipleObjects(nThreads, hThreads, TRUE, INFINITE);
    return 0;
}

没有锁定正确的操作

这里的关键是,当您在线程A中禁用hEventNotFull事件时,您也在线程b中启用它。

线程并发运行

下面是发生的事情:

  1. 队列已满,为4096项

  2. 线程B获得锁并将计数减为4095。在决定是否启用hEventNotFull之前,您需要持有此锁,但是您会立即释放它。操作系统暂停线程B。

  3. 线程A获得锁并将count增加到4096。你需要持有这个锁,直到你决定是否重置hEventNotFull,但你立即释放它。

  4. 操作系统认为线程B比线程a更重要

  5. 所以你在线程A中调用resetEvent,然后在线程b中调用SetEvent,最终结果是你将返回到线程A中执行,计数== 4096。

执行流程:

Thread B: Get count and decrement it to 4095.  # Queue not full
Thread A: Get count and increment it to 4096.  # Queue full
Thread A: ResetEvent on `hEventNotFull`        # A thinks it will block since queue is full
Thread B: SetEvent on `hEventNotFull`          # B is using stale info and unblocks A