C Simple RingBuffer语言 - 多线程 - 查找关键部分
C Simple RingBuffer - Multithreading - Finding Critical Sections
所以我写了一个简单的 C 环形缓冲区,我现在正在使用多个线程进行测试,我很难尝试让代码失败,以便我可以识别关键部分。
注意:代码是 C 语言,但我正在C++文件中对其进行测试,因为它更容易创建线程互斥锁等。
头文件:
#ifndef _C_TEST_H_
#define _C_TEST_H_
#include <stdio.h>
#include <mutex>
///////////////////////////////////////////////////////////////////////////////
// Defines and macros
///////////////////////////////////////////////////////////////////////////////
#ifndef __cplusplus
typedef enum { false, true } bool;
#endif
#define RING_BUFFER_SIZE 2000
///////////////////////////////////////////////////////////////////////////////
// Structures, Enumerations, Typedefs
///////////////////////////////////////////////////////////////////////////////
typedef struct Node
{
int val;
struct Node *next;
struct Node *previous;
} Node_T;
typedef enum RB_ERC
{
RB_ERC_NO_ERROR,
RB_ERC_NULL_PTR,
RB_ERC_UNDERFLOW,
RB_ERC_OVERFLOW
} RB_ERC_T;
typedef enum RB_HANDLE_OVERFLOW
{
RB_DECIMATE,
RB_IGNORE_AND_RETURN_ERROR
} RB_HANDLE_OVERFLOW_T;
typedef enum RB_READ_MODE
{
RB_FIFO,
RB_LIFO
} RB_READ_MODE_T;
typedef struct RingBuffer
{
int curSize;
RB_HANDLE_OVERFLOW_T handleOverflow;
struct Node *Write;
struct Node *Read;
Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;
///////////////////////////////////////////////////////////////////////////////
// Prototypes
///////////////////////////////////////////////////////////////////////////////
#ifdef __cplusplus
extern "C" {
#endif
RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T ifOverflow_);
//Return true if the queue has no elements; false if there are elements on the queue
bool RB_IsEmpty(RING_BUFFER_T *rb_);
//Return true if the queue is full; false if there are seats available
bool RB_IsFull(RING_BUFFER_T *rb_);
//Write N elements (length of the array) to the queue
//Note: array values will be read from element 0 to array length
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_);
//Write 1 element
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_);
//Dequeue and read N elements (length of the array) into an array
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_);
//Dequeue and read 1 element
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_);
#ifdef __cplusplus
}
#endif
#endif //_C_TEST_H_
源:
#include "CTest.h"
static std::mutex m;
RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T handleOverflow_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
int i;
if(rb_ == 0)
{
return RB_ERC_NULL_PTR;
}
//Initialize this instance of the ring buffer
//Both the read/write pointers should start at the same location
rb_->curSize = 0;
rb_->Read = &rb_->buffer[0];
rb_->Write = &rb_->buffer[0];
rb_->handleOverflow = handleOverflow_;
//Build the circular doubly-linked list
for(i = 0; i < RING_BUFFER_SIZE; i++)
{
rb_->buffer[i].val = 0;
if(i == 0)
{
//Sentinal Node found. Point the first node to the last element of the array
rb_->buffer[i].previous = &rb_->buffer[(RING_BUFFER_SIZE - 1)];
rb_->buffer[i].next = &rb_->buffer[i + 1];
}
else if(i < (RING_BUFFER_SIZE - 1) )
{
rb_->buffer[i].next = &rb_->buffer[i + 1];
rb_->buffer[i].previous = &rb_->buffer[i - 1];
}
else
{
//Sentinal node found. Reached the last element in the array; Point the sentinal
//node to the first element in the array to create a circular linked list.
rb_->buffer[i].next = &rb_->buffer[0];
rb_->buffer[i].previous = &rb_->buffer[i - 1];
}
}
//m.unlock();
return erc;
}
bool RB_IsEmpty(RING_BUFFER_T *rb_)
{
//m.lock();
//Note: assume rb is valid.
if(rb_->curSize == 0)
{
return true;
}
else
{
return false;
}
//m.unlock();
}
bool RB_IsFull(RING_BUFFER_T *rb_)
{
//m.lock();
//Note: assume rb is valid.
if(rb_->curSize == RING_BUFFER_SIZE)
{
return true;
}
else
{
return false;
}
//m.unlock();
}
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
int i;
if(rb_ == 0 || values_ == 0 || length_ == 0)
{
return RB_ERC_NULL_PTR;
}
switch(rb_->handleOverflow)
{
//Increment through the array and enqueue
//If attempting to write more elements than are available on the queue
//Decimate - overwrite old data
//Ignore and return error - Don't write any data and throw an error
case RB_DECIMATE:
for(i = 0; i < length_; i++)
{
RB_Write(rb_, values_[i] );
}
break;
default:
case RB_IGNORE_AND_RETURN_ERROR:
{
int numSeatsAvailable = (RING_BUFFER_SIZE - rb_->curSize);
if( length_ <= numSeatsAvailable )
{
//Increment through the array and enqueue
for(i = 0; i < length_; i++)
{
RB_Write(rb_, values_[i] );
}
}
else
{
//Attempted to write more elements than are avaialable on the queue
erc = RB_ERC_OVERFLOW;
}
}
break;
}
//m.unlock();
return erc;
}
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(rb_ == 0)
{
return RB_ERC_NULL_PTR;
}
if( !RB_IsFull(rb_) )
{
//Write the value to the current location, then increment the write pointer
//so that the write pointer is always pointing 1 element ahead of the queue
rb_->Write->val = val_;
rb_->Write = rb_->Write->next;
rb_->curSize++;
}
else
{
//Overflow
switch(rb_->handleOverflow)
{
case RB_DECIMATE:
//Set the value and increment both the read/write pointers
rb_->Write->val = val_;
rb_->Write = rb_->Write->next;
rb_->Read = rb_->Read->next;
break;
default:
case RB_IGNORE_AND_RETURN_ERROR:
erc = RB_ERC_OVERFLOW;
break;
}
}
//m.unlock();
return erc;
}
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(values_ == 0)
{
return RB_ERC_NULL_PTR;
}
//Verify that the amount of data to be read is actually available on the queue
if( length_ <= rb_->curSize )
{
//Increment through the array and dequeue
int i;
for(i = 0; i < length_; i++)
{
//Note: Error conditions have already been checked. Skip the ERC check
(void) RB_Read(rb_, &values_[i], readMode_);
}
}
else
{
//Attempted to read more data than is available on the queue
erc = RB_ERC_UNDERFLOW;
}
//m.unlock();
return erc;
}
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_)
{
//m.lock();
RB_ERC_T erc = RB_ERC_NO_ERROR;
if(rb_ == 0 || readVal_ == 0)
{
return RB_ERC_NULL_PTR;
}
if( !RB_IsEmpty(rb_) )
{
switch(readMode_)
{
case RB_LIFO:
//Use the head (Write) to read the most recently written value (newest data)
//Note: The write pointer is always pointing 1 position ahead of the current queue.
rb_->Write = rb_->Write->previous; //Decrement write pointer
//Read the data
*readVal_ = rb_->Write->val;
rb_->Write->val = 0; //Reset read values to 0
break;
default:
case RB_FIFO:
*readVal_ = rb_->Read->val;
rb_->Read->val = 0; //Reset read values to 0
rb_->Read = rb_->Read->next; //Increment read pointer
break;
}
rb_->curSize--;
}
else
{
//Attempted to read more data but there is no data available on the queue
erc = RB_ERC_UNDERFLOW;
}
//m.unlock();
return erc;
}
用于测试的主要CPP:
#include "CTest.h"
#include <iostream>
#include "windows.h"
#include <thread>
using namespace std;
static RING_BUFFER_T test1;
const int dataSize = 300;
const int dataSizeout = 1000;
int sharedValue = 0;
static std::mutex m;
void function1()
{
int data[dataSize];
RB_ERC_T erc = RB_ERC_NO_ERROR;
for (int i = 0; i < dataSizeout; i++)
{
erc = RB_Write(&test1, i);
if (erc != RB_ERC_NO_ERROR)
{
printf("Count down errrror %dn", erc);
}
}
//RB_WriteArray(&test1, data, dataSize);
}
void function2()
{
RB_ERC_T erc = RB_ERC_NO_ERROR;
for (int i = 0; i > -dataSizeout; i--)
{
erc = RB_Write(&test1, i);
if (erc != RB_ERC_NO_ERROR)
{
printf("Count down errrror %dn", erc);
}
}
}
int main()
{
RB_InitRingBuffer(&test1, RB_DECIMATE);
thread p1(function1);
//Sleep(1000);
thread p2(function2);
p1.join();
p2.join();
//Read out 5 at a time
int out;
int cnt = 0;
while(cnt < (2 * dataSizeout) )
{
if (RB_Read(&test1, &out, RB_LIFO) == RB_ERC_NO_ERROR)
{
printf("out[%d] = %dn", cnt, out);
cnt += 1;
}
}
system("Pause");
return 0;
}
我认为主RING_BUFFER_T实例中的所有内容都将是共享变量,因此无论在哪里使用它们,几乎无处不在,它们都必须包含在互斥体中。
typedef struct RingBuffer
{
int curSize;
RB_HANDLE_OVERFLOW_T handleOverflow;
struct Node *Write;
struct Node *Read;
Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;
我想NODE_T也会,但仅用于初始化。是我错了,还是不应该将填充在环形缓冲区中的元素按顺序放置,因为现在没有使用互斥锁?
有关无锁环形缓冲区的最新 C 实现,请查看 Linux 内核源代码。这应该让你对专家是如何做到的有所了解,而且它是经过实战验证的代码。请参阅 linux/kfifo.h 和相应的 C 文件。
Linux 环形缓冲区的设计说明,不知道它是如何最新的
有关如何在C++中执行此操作的想法,您可以查看
Linux 期刊关于无锁队列C++文章
或者看看boost::lockfree::queue。当然,使用 C++ 可以让您使用泛型类型(模板),例如将函数指针替换为编译时绑定调用,从而实现比 C 更好的性能。你可以避免那些讨厌的空白*指针。
不要公开函数RB_IsEmpty
和RB_IsFull
,因为返回值可能会立即无效。如果仅从读/写中调用它们,则无需在该函数中进行保护。
通常,您必须在外部公开的读取和写入函数中保护结构,从第一次访问到最后一次访问。无需保护参数检查。
您不得双重锁定。不要从RB_ReadArray
呼叫RB_Read
。提供双方使用的内部读取功能。写入函数也是如此。
相关文章:
- 查找自动生成键并具有线性内存消耗的小型关联数组
- 查找不存在的键时,unordered_map返回什么
- 在多映射中查找键、价值链的长度
- 查找配对的匹配键范围
- 在 c++ 的unordered_map中查找最大键
- 在矢量中查找映射键的最快方法
- 查找函数在unordered_map中的工作方式是搜索键值
- std::在地图上查找无法正常工作并循环访问地图的键和值
- 如何在给定任意数量的整数的情况下创建一个唯一键?并使用该键存储,然后从地图中查找
- 在地图键的矢量中查找多个字符串
- 键值映射中的部分查找,其中键本身是键值映射
- 如何在具有非常量指针键的地图中通过常量指针键查找
- 如何创建一个以键为输入并在C++std:map中查找值的模板函数
- 如何通过 std::unordered_map<std::wstring, T> 中的 std::wstring_view 类型的键查找?
- 为什么当代码尝试在 STL 映射中查找键时出现分段错误
- MongoDB的性能在一系列值中以键查找文档
- C++ 映射查找值和关联的键
- 查找有关当前键盘布局的字符的可能键组合
- 映射与对象键,查找和比较功能
- C++中具有字符串键的查找表的内存管理