C Simple RingBuffer语言 - 多线程 - 查找关键部分

C Simple RingBuffer - Multithreading - Finding Critical Sections

本文关键字:键部 查找 多线程 RingBuffer 语言 Simple      更新时间:2023-10-16

所以我写了一个简单的 C 环形缓冲区,我现在正在使用多个线程进行测试,我很难尝试让代码失败,以便我可以识别关键部分。

注意:代码是 C 语言,但我正在C++文件中对其进行测试,因为它更容易创建线程互斥锁等。

头文件:

#ifndef _C_TEST_H_
#define _C_TEST_H_
#include <stdio.h>
#include <mutex>
///////////////////////////////////////////////////////////////////////////////
// Defines and macros
///////////////////////////////////////////////////////////////////////////////
#ifndef __cplusplus
typedef enum { false, true } bool;
#endif
#define RING_BUFFER_SIZE 2000
///////////////////////////////////////////////////////////////////////////////
// Structures, Enumerations, Typedefs
///////////////////////////////////////////////////////////////////////////////
typedef struct Node
{
    int val;
    struct Node *next;
    struct Node *previous;
} Node_T;
typedef enum RB_ERC
{
    RB_ERC_NO_ERROR,
    RB_ERC_NULL_PTR,
    RB_ERC_UNDERFLOW,
    RB_ERC_OVERFLOW
} RB_ERC_T;
typedef enum RB_HANDLE_OVERFLOW
{
    RB_DECIMATE,
    RB_IGNORE_AND_RETURN_ERROR
} RB_HANDLE_OVERFLOW_T;
typedef enum RB_READ_MODE
{
    RB_FIFO,
    RB_LIFO
} RB_READ_MODE_T;

typedef struct RingBuffer
{
    int curSize;
    RB_HANDLE_OVERFLOW_T handleOverflow;
    struct Node *Write;
    struct Node *Read;
    Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;

///////////////////////////////////////////////////////////////////////////////
// Prototypes
///////////////////////////////////////////////////////////////////////////////
#ifdef __cplusplus
extern "C" {
#endif
RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T ifOverflow_);
//Return true if the queue has no elements; false if there are elements on the queue
bool RB_IsEmpty(RING_BUFFER_T *rb_);
//Return true if the queue is full; false if there are seats available
bool RB_IsFull(RING_BUFFER_T *rb_);
//Write N elements (length of the array) to the queue
//Note: array values will be read from element 0 to array length
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_);
//Write 1 element
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_);
//Dequeue and read N elements (length of the array) into an array
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_);
//Dequeue and read 1 element
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_);
#ifdef __cplusplus
}
#endif

#endif //_C_TEST_H_

源:

#include "CTest.h"
static std::mutex m;
RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T handleOverflow_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    int i;
    if(rb_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }
    //Initialize this instance of the ring buffer
    //Both the read/write pointers should start at the same location
    rb_->curSize            = 0;
    rb_->Read               = &rb_->buffer[0];
    rb_->Write              = &rb_->buffer[0];
    rb_->handleOverflow     = handleOverflow_;
    //Build the circular doubly-linked list
    for(i = 0; i < RING_BUFFER_SIZE; i++)
    {
        rb_->buffer[i].val = 0;
        if(i == 0)
        {
            //Sentinal Node found. Point the first node to the last element of the array
            rb_->buffer[i].previous = &rb_->buffer[(RING_BUFFER_SIZE - 1)];
            rb_->buffer[i].next     = &rb_->buffer[i + 1];
        }
        else if(i < (RING_BUFFER_SIZE - 1) )
        {
            rb_->buffer[i].next     = &rb_->buffer[i + 1];
            rb_->buffer[i].previous = &rb_->buffer[i - 1];
        }
        else
        {
            //Sentinal node found. Reached the last element in the array; Point the sentinal
            //node to the first element in the array to create a circular linked list.
            rb_->buffer[i].next     = &rb_->buffer[0];
            rb_->buffer[i].previous = &rb_->buffer[i - 1];
        }
    }
    //m.unlock();
    return erc;
}
bool RB_IsEmpty(RING_BUFFER_T *rb_)
{
    //m.lock();
    //Note: assume rb is valid.
    if(rb_->curSize == 0)
    {
        return true;
    }
    else
    {
        return false;
    }
    //m.unlock();
}
bool RB_IsFull(RING_BUFFER_T *rb_)
{
    //m.lock();
    //Note: assume rb is valid.
    if(rb_->curSize == RING_BUFFER_SIZE)
    {
        return true;
    }
    else
    {
        return false;
    }
    //m.unlock();
}

RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    int i;
    if(rb_ == 0 || values_ == 0 || length_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }
    switch(rb_->handleOverflow)
    {
        //Increment through the array and enqueue
        //If attempting to write more elements than are available on the queue
        //Decimate                  - overwrite old data
        //Ignore and return error   - Don't write any data and throw an error
        case RB_DECIMATE:
            for(i = 0; i < length_; i++)
            {
                RB_Write(rb_, values_[i] );
            }
            break;
        default:
        case RB_IGNORE_AND_RETURN_ERROR:
        {
            int numSeatsAvailable = (RING_BUFFER_SIZE - rb_->curSize);
            if( length_ <= numSeatsAvailable )
            {
                //Increment through the array and enqueue
                for(i = 0; i < length_; i++)
                {
                    RB_Write(rb_, values_[i] );
                }
            }
            else
            {
                //Attempted to write more elements than are avaialable on the queue
                erc = RB_ERC_OVERFLOW;
            }
        }
            break;
    }
    //m.unlock();
    return erc;
}
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    if(rb_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }
    if( !RB_IsFull(rb_) )
    {
        //Write the value to the current location, then increment the write pointer
        //so that the write pointer is always pointing 1 element ahead of the queue
        rb_->Write->val = val_;
        rb_->Write      = rb_->Write->next;
        rb_->curSize++;
    }
    else
    {
        //Overflow
        switch(rb_->handleOverflow)
        {
            case RB_DECIMATE:
                //Set the value and increment both the read/write pointers
                rb_->Write->val = val_;
                rb_->Write      = rb_->Write->next;
                rb_->Read       = rb_->Read->next;
                break;
            default:
            case RB_IGNORE_AND_RETURN_ERROR:
                erc = RB_ERC_OVERFLOW;
                break;
        }
    }
    //m.unlock();
    return erc;
}

RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    if(values_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }
    //Verify that the amount of data to be read is actually available on the queue
    if( length_ <= rb_->curSize )
    {
        //Increment through the array and dequeue
        int i;
        for(i = 0; i < length_; i++)
        {
            //Note: Error conditions have already been checked. Skip the ERC check
            (void) RB_Read(rb_, &values_[i], readMode_);
        }
    }
    else
    {
        //Attempted to read more data than is available on the queue
        erc = RB_ERC_UNDERFLOW;
    }
    //m.unlock();
    return erc;
}

RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    if(rb_ == 0 || readVal_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }
    if( !RB_IsEmpty(rb_) )
    {
        switch(readMode_)
        {
            case RB_LIFO:
                //Use the head (Write) to read the most recently written value (newest data)
                //Note: The write pointer is always pointing 1 position ahead of the current queue.
                rb_->Write = rb_->Write->previous;      //Decrement write pointer
                //Read the data
                *readVal_       = rb_->Write->val;
                rb_->Write->val = 0;                    //Reset read values to 0
                break;
            default:
            case RB_FIFO:
                *readVal_       = rb_->Read->val;
                rb_->Read->val  = 0;                    //Reset read values to 0
                rb_->Read       = rb_->Read->next;      //Increment read pointer
                break;
        }
        rb_->curSize--;
    }
    else
    {
        //Attempted to read more data but there is no data available on the queue
        erc = RB_ERC_UNDERFLOW;
    }
    //m.unlock();
    return erc;
}

用于测试的主要CPP:

#include "CTest.h"
#include <iostream>
#include "windows.h"
#include <thread>
using namespace std;
static RING_BUFFER_T test1;
const int dataSize = 300;
const int dataSizeout = 1000;
int sharedValue = 0;
static std::mutex m;

void function1()
{
    int data[dataSize];
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    for (int i = 0; i < dataSizeout; i++)
    {
        erc = RB_Write(&test1, i);
        if (erc != RB_ERC_NO_ERROR)
        {
            printf("Count down errrror %dn", erc);
        }
    }
    //RB_WriteArray(&test1, data, dataSize);
}

void function2()
{
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    for (int i = 0; i > -dataSizeout; i--)
    {
        erc = RB_Write(&test1, i);
        if (erc != RB_ERC_NO_ERROR)
        {
            printf("Count down errrror %dn", erc);
        }
    }
}
int main()
{
RB_InitRingBuffer(&test1, RB_DECIMATE);
    thread p1(function1);
    //Sleep(1000);
    thread p2(function2);
    p1.join();
    p2.join();
    //Read out 5 at a time
    int out;
    int cnt = 0;
    while(cnt < (2 * dataSizeout) )
    {
        if (RB_Read(&test1, &out, RB_LIFO) == RB_ERC_NO_ERROR)
        {
            printf("out[%d] = %dn", cnt, out);
            cnt += 1;
        }
    }

    system("Pause");
    return 0;
}

我认为主RING_BUFFER_T实例中的所有内容都将是共享变量,因此无论在哪里使用它们,几乎无处不在,它们都必须包含在互斥体中。

typedef struct RingBuffer
    {
        int curSize;
        RB_HANDLE_OVERFLOW_T handleOverflow;
        struct Node *Write;
        struct Node *Read;
        Node_T buffer[RING_BUFFER_SIZE];
    } RING_BUFFER_T;

我想NODE_T也会,但仅用于初始化。是我错了,还是不应该将填充在环形缓冲区中的元素按顺序放置,因为现在没有使用互斥锁?

有关无锁环形缓冲区的最新 C 实现,请查看 Linux 内核源代码。这应该让你对专家是如何做到的有所了解,而且它是经过实战验证的代码。请参阅 linux/kfifo.h 和相应的 C 文件。

Linux 环形缓冲区的设计说明,不知道它是如何最新的

有关如何在C++中执行此操作的想法,您可以查看

Linux 期刊关于无锁队列C++文章

或者看看boost::lockfree::queue。当然,使用 C++ 可以让您使用泛型类型(模板),例如将函数指针替换为编译时绑定调用,从而实现比 C 更好的性能。你可以避免那些讨厌的空白*指针。

不要公开函数RB_IsEmptyRB_IsFull,因为返回值可能会立即无效。如果仅从读/写中调用它们,则无需在该函数中进行保护。

通常,您必须在外部公开的读取和写入函数中保护结构,从第一次访问到最后一次访问。无需保护参数检查。

您不得双重锁定。不要从RB_ReadArray呼叫RB_Read。提供双方使用的内部读取功能。写入函数也是如此。