适用于大型数组的无复制线程安全环形缓冲区

Copy-free thread-safe Ring Buffer for Big Arrays

本文关键字:安全 缓冲区 线程 复制 大型 数组 适用于      更新时间:2023-10-16

对于大数组(10^7 个元素(的信号处理,我使用与环形缓冲区连接的不同线程。可悲的是,将数据复制到缓冲区和从缓冲区复制需要太多时间。当前的实现基于boost::lockfree::spsc_queue.

所以我正在寻找一种解决方案,通过使用对向量的unique_ptr来交换线程和缓冲区之间矢量的所有权(请参阅附图:在线程和队列之间交换指针(。

移动智能指针不符合我的需求,因为因此我需要在运行时不断为新的矢量元素分配内存。这种开销比复制数据要大。

我错过了该设计中的缺陷吗?

是否有线程安全甚至无锁的环形缓冲区实现允许对推送和弹出进行交换操作?

编辑:我修改了一个锁定环缓冲区以交换unique_ptr。性能提升是巨大的。虽然这感觉不是一个优雅的解决方案。有什么建议吗?

// https://github.com/embeddedartistry/embedded-resources/blob/master/examples/cpp/circular_buffer.cpp
#include <memory>
#include <mutex>
template <typename T, int SIZE>
class RingbufferPointer {
typedef std::unique_ptr<T> TPointer;
public:
explicit RingbufferPointer() {
// create objects
for (int i=0; i<SIZE; i++) {
buf_[i] = std::make_unique<T>();
}
}
bool push(TPointer &item) {
std::lock_guard<std::mutex> lock(mutex_);
if (full())
return false;
std::swap(buf_[head_], item);
if (full_)
tail_ = (tail_ + 1) % max_size_;
head_ = (head_ + 1) % max_size_;
full_ = head_ == tail_;
return true;
}
bool pop(TPointer &item) {
std::lock_guard<std::mutex> lock(mutex_);
if (empty())
return false;
std::swap(buf_[tail_], item);
full_ = false;
tail_ = (tail_ + 1) % max_size_;
return true;
}
void reset() {
std::lock_guard<std::mutex> lock(mutex_);
head_ = tail_;
full_ = false;
}
bool empty() const {
return (!full_ && (head_ == tail_));
}
bool full() const {
return full_;
}
int capacity() const {
return max_size_;
}
int size() const {
int size = max_size_;
if(!full_) {
if(head_ >= tail_)
size = head_ - tail_;
else
size = max_size_ + head_ - tail_;
}
return size;
}
private:
TPointer buf_[SIZE];
std::mutex mutex_;
int head_ = 0;
int tail_ = 0;
const int max_size_ = SIZE;
bool full_ = 0;
};

移动智能指针不符合我的需求,因为因此我需要 在运行时为新的矢量元素不断分配内存。

如果您预先分配足够的存储并实施自己的内存管理,例如简单的隔离存储(也称为池化(,则不一定如此。

如果你这样做,没有什么能阻止你交换,你可以使用任何支持元素交换的环形缓冲区来保持你现有的架构,并保持与以前相同的线程安全性。您可以选中仅使用boost::pool而不是实现自己的选项。

如果我正确理解您的任务 - 您需要 2 个容器:

  • 用于自由元素的线程安全和无锁池 - 用于不分配/每次都释放它。推送和弹出无需等待。
  • 线程安全和无锁的单写入器/单读取器FIFO队列, 推送和弹出无需等待。

有了这个,你可以做下一步:

  • 一开始,你分配N元素并将其推送到池中。
  • 生产者从池中弹出免费项目(而不是分配内存(
  • 准备物料数据
  • 将其推送到先进先出队列
  • 如果池中没有免费项目 - 来自消费者的等待信号

  • 先进先出队列中的消费者弹出项
  • 流程物料数据
  • 将项目推送回池(而是释放 IT 内存(
  • 如果队列为空 - 来自生产者的等待信号

FIFO队列可以通过以下方式实现:

class CyclicBufer
{
struct alignas(8) Position 
{
ULONG _begin, _data_size;
};
std::atomic<Position> _pos;
void** _items;
ULONG _buf_size;
public:
// Requires: only one thread is allowed to push data to the CyclicBufer
bool push(void* item, bool* bWasEmpty = 0);
// Requires: only one thread is allowed to pop data to the CyclicBufer
bool pop(void** pitem, bool* bNotEmpty = 0);
~CyclicBufer()
{
if (_items)
{
delete [] _items;
}
}
CyclicBufer() : _items(0), _buf_size(0)
{
_pos._My_val._begin = 0, _pos._My_val._data_size = 0;
}
bool create(ULONG buf_size)
{
if (_items = new(std::nothrow) void*[buf_size])
{
_buf_size = buf_size;
return true;
}
return false;
}
bool is_empty()
{
Position current_pos = _pos.load(std::memory_order_relaxed);
return !current_pos._data_size;
}
};
bool CyclicBufer::push(void* item, bool* bWasEmpty /*= 0*/)
{
Position current_pos = _pos.load(std::memory_order_relaxed);
if (current_pos._data_size >= _buf_size) return false;
// (_pos._begin + _pos._data_size) % _buf_size never changed in pop
_items[(current_pos._begin + current_pos._data_size) % _buf_size] = item;
for (;;)
{
Position new_pos = {
current_pos._begin, current_pos._data_size + 1
};
if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_release))
{
if (bWasEmpty) *bWasEmpty = current_pos._data_size == 0;
return true;
}
}
}
bool CyclicBufer::pop(void** pitem, bool* bNotEmpty /*= 0*/)
{
Position current_pos = _pos.load(std::memory_order_acquire);
if (!current_pos._data_size) return false;
// current_pos._begin never changed in push
void* item = _items[current_pos._begin];
for (;;)
{
Position new_pos = {
(current_pos._begin + 1) % _buf_size, current_pos._data_size - 1
};
if (_pos.compare_exchange_weak(current_pos, new_pos, std::memory_order_relaxed))
{
if (bNotEmpty) *bNotEmpty = new_pos._data_size != 0;
*pitem = item;
return true;
}
}
}

对于 Windows 上的线程安全和无锁池实现可以InterlockedPushEntrySListInterlockedPopEntrySList使用,但当然可以实现此 API 和您自己:

struct list_entry {
list_entry *Next;
};
#if defined(_M_X64) || defined(_M_ARM64)
#define MACHINE_64
#endif
struct alignas(sizeof(PVOID)*2) list_head 
{  
union {
struct {
INT_PTR DepthAndSequence;
union {
list_entry* NextEntry;
INT_PTR iNextEntry;
};
};
__int64 value; // for 32-bit only
};
void init()
{
iNextEntry = 0, DepthAndSequence = 0;
}
bool push(list_entry* entry)
{
list_head current = { { DepthAndSequence, NextEntry } }, new_head;
for (;;)
{
entry->Next = current.NextEntry;
new_head.NextEntry = entry;
new_head.DepthAndSequence = current.DepthAndSequence + 0x10001;
#ifdef MACHINE_64
if (_INTRIN_RELEASE(_InterlockedCompareExchange128)(
&DepthAndSequence, 
new_head.iNextEntry, new_head.DepthAndSequence, 
&current.DepthAndSequence))
{
// return is list was empty before push
return !current.NextEntry;
}
#else
new_head.value = _INTRIN_RELEASE(_InterlockedCompareExchange64)(
&value, new_head.value, current.value);
if (new_head.value == current.value)
{
// return is list was empty before push
return !current.NextEntry;
}
current.value = new_head.value;
#endif
}
}
list_entry* pop()
{
list_head current = { { DepthAndSequence, NextEntry } }, new_head;
for (;;)
{
list_entry* entry = current.NextEntry;
if (!entry)
{
return 0;
}
// entry must be valid memory
new_head.NextEntry = entry->Next;
new_head.DepthAndSequence = current.DepthAndSequence - 1;
#ifdef MACHINE_64
if (_INTRIN_ACQUIRE(_InterlockedCompareExchange128)(&DepthAndSequence, 
new_head.iNextEntry, new_head.DepthAndSequence, 
&current.DepthAndSequence))
{
return entry;
}
#else
new_head.value = _INTRIN_ACQUIRE(_InterlockedCompareExchange64)(
&value, new_head.value, current.value);
if (new_head.value == current.value)
{
return entry;
}
current.value = new_head.value;
#endif
}
}
};
#pragma warning(disable : 4324)
template <class _Ty>
class FreeItems : list_head
{
void* _items;
union Chunk {
list_entry entry;
char buf[sizeof(_Ty)];
};
public:
~FreeItems()
{
if (_items)
{
delete [] _items;
}
}
FreeItems() : _items(0)
{
init();
}
bool create(ULONG count)
{
if (Chunk* items = new(std::nothrow) Chunk[count])
{
_items = items;
union {
list_entry* entry;
Chunk* item;
};
item = items;
do 
{
list_head::push(entry);
} while (item++, --count);
return true;
}
return false;
}
_Ty* pop()
{
return (_Ty*)list_head::pop();
}
bool push(_Ty* item)
{
return list_head::push((list_entry*)item);
}
};

有了这 2 个容器演示/测试代码,看起来像(Windows 的代码,但主要 - 我们如何使用池和队列(

struct BigData 
{
ULONG _id;
};
struct CPData : CyclicBufer, FreeItems<BigData>
{
HANDLE _hDataEvent, _hFreeEvent, _hConsumerStop, _hProducerStop;
ULONG _waitReadId, _writeId, _setFreeCount, _setDataCount;
std::_Atomic_integral_t _dwRefCount;
bool _bStop;
static ULONG WINAPI sProducer(void* This)
{
reinterpret_cast<CPData*>(This)->Producer();
reinterpret_cast<CPData*>(This)->Release();
return __LINE__;
}
void Producer()
{
HANDLE Handles[] = { _hProducerStop, _hFreeEvent  };
for (;;)
{
BigData* item;
while (!_bStop && (item = FreeItems::pop()))
{
// init data item
item->_id = _writeId++;
bool bWasEmpty;
if (!CyclicBufer::push(item, &bWasEmpty)) __debugbreak();
if (bWasEmpty)
{
_setDataCount++;
SetEvent(_hDataEvent);
}
}
switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
SetEvent(_hConsumerStop);
return;
case WAIT_OBJECT_0 + 1:
break;
default:
__debugbreak();
}
}
}
static ULONG WINAPI sConsumer(void* This)
{
reinterpret_cast<CPData*>(This)->Consumer();
reinterpret_cast<CPData*>(This)->Release();
return __LINE__;
}
void Consumer()
{
HANDLE Handles[] = { _hDataEvent, _hConsumerStop };
for (;;)
{
switch (WaitForMultipleObjects(2, Handles, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
break;
case WAIT_OBJECT_0 + 1:
return;
default:
__debugbreak();
}
bool bNotEmpty;
do 
{
BigData* item;
if (!CyclicBufer::pop((void**)&item, &bNotEmpty)) __debugbreak();
// check FIFO order
if (item->_id != _waitReadId) __debugbreak();
_waitReadId++;
// process item
// free item to the pool
if (FreeItems::push(item))
{
// stack was empty
_setFreeCount++;
SetEvent(_hFreeEvent);
}
} while (bNotEmpty);
}
}
~CPData()
{
if (_hConsumerStop) CloseHandle(_hConsumerStop);
if (_hProducerStop) CloseHandle(_hProducerStop);
if (_hFreeEvent) CloseHandle(_hFreeEvent);
if (_hDataEvent) CloseHandle(_hDataEvent);
if (_waitReadId != _writeId || !CyclicBufer::is_empty()) __debugbreak();
DbgPrint("%s(%u %u %u)n", __FUNCTION__, _writeId, _setFreeCount, _setDataCount);
}
public:
CPData()
{
_hFreeEvent = 0, _hDataEvent = 0, _hProducerStop = 0, _hConsumerStop = 0;
_waitReadId = 0, _writeId = 0, _dwRefCount = 1;
_setFreeCount = 0, _setDataCount = 0, _bStop = false;
}
void AddRef()
{
_MT_INCR(_dwRefCount);
}
void Release()
{
if (!_MT_DECR(_dwRefCount))
{
delete this;
}
}
ULONG Create(ULONG n)
{
if (!CyclicBufer::create(n) || !FreeItems::create(n))
{
return ERROR_NO_SYSTEM_RESOURCES;
}
return (_hDataEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
(_hFreeEvent = CreateEvent(0, FALSE, FALSE, 0)) &&
(_hProducerStop = CreateEvent(0, TRUE, FALSE, 0)) &&
(_hConsumerStop = CreateEvent(0, TRUE, FALSE, 0)) ? 0 : GetLastError();
}
ULONG StartThread(bool bConsumer)
{
AddRef();
if (HANDLE hThread = CreateThread(0, 0, bConsumer ? sConsumer : sProducer, this, 0, 0))
{
CloseHandle(hThread);
return 0;
}
Release();
return GetLastError();
}
ULONG Stop()
{
ULONG err = SetEvent(_hProducerStop) ? 0 : GetLastError();
_bStop = true;
return err;
}
};
void BufTest()
{
if (CPData* p = new CPData)
{
if (!p->Create(16))
{
if (!p->StartThread(false))
{
p->StartThread(true);
}
MessageBoxW(0, 0, L"Wait Stop", MB_ICONINFORMATION);
p->Stop();
}
p->Release();
}
MessageBoxW(0,0,0,1);
}

尽管boost::lockfree::spsc_queue缺乏移动支持,但您仍然可以这样做。

将向量移入和移出队列的示例:

struct Element {
std::vector<int> data_;
Element(std::vector<int>& data)
: data_(std::move(data))
{}
Element(Element const&) = delete;
Element operator=(Element const&) = delete;
operator std::vector<int>&&() {
return std::move(data_);
}
};
int main() {
boost::lockfree::spsc_queue<Element, boost::lockfree::capacity<2>> q;
std::vector<int> a(1);
assert(!a.empty());
q.push(&a, &a + 1); // Move the vector into the queue.
assert(a.empty());
std::vector<int> b = q.front(); // Move the vector from queue.
assert(!b.empty());
q.pop();
}

我使用的一种技术是这个...

void next_step(std::vector<std::string> &a)
{
std::vector<std::string> v;
v.swap(a);
// process vector ...
}

无需交换或复制单个元素。 快速高效。

话筒