是否有任何对象可以用作FIFO和弹出数据的事件

Is there any object that I can use as FIFO with event for poping data out?

本文关键字:数据 事件 FIFO 任何 对象 是否      更新时间:2023-10-16

我需要一个FIFO对象,当它有一个元素时,它会生成一个事件或进行回调以通知元素可用。据我所见,std:queue不支持这一点。

在我的例子中,我有两个线程,一个线程生成数据,另一个线程需要使用它们。

第一个线程生成数据的速度不是固定的,因此我需要有一个缓冲区来存储数据,这样其他线程就可以以相对恒定的方式读取和处理数据。

我知道如何实现编写器,但在读者端,如果我轮询队列,那么我将失去一些检查队列状态的处理能力,我想知道是否有更好的方法可以做到这一点?

编辑1

这与队列的线程安全无关,但std::queue是基于轮询的,但我需要一些基于事件的东西。std::queue不是基于事件的,当有新数据可用时,它不会进行回调。

如果我理解你的问题,你可以使用C++std::condition_variables在线程之间通知队列中项目的可用性,然后调用回调。

代码看起来像这样。这里,主线程充当生成器线程,接收线程充当使用者线程

std::condition_variable Cv_;
std::mutex Mutex_;
std::queue<int> qVal;
void callback() {
cout << "Callback Called with queue Value =>" << qVal.front() << endl;
qVal.pop();
}
void ReceiveThread() {
while (true) {
std::unique_lock<mutex> Lock(Mutex_);
Cv_.wait(Lock);
callback();
}
}
int main() {
thread thrd(ReceiveThread);
int pushVal = 1;
while (1) {
this_thread::sleep_for(std::chrono::seconds(1));
qVal.push(pushVal);
cout << "Signalling for callback with value = " << pushVal<< endl;
pushVal++;
Cv_.notify_all();
}
}

我还没有在while循环中添加任何您可能想要的退出条件。

希望这能有所帮助。

std::queue::push()函数内部没有任何占位符,因此我们可以简单地将call_back function引用放在其中,以便在queue中成功插入元素后,调用call_back function

std::queue::push(data)
{
//add data to internal container
//placeholder section--------
//invoke call_back function or event.
//placeholder section--------
}

因此,在没有此类占位符的情况下,我们可以尝试使用RAII自动调用call_back function或某些事件。

假设我们将实际数据封装在struct中,这将有助于我们进行通知。然后,我们将被要求通过该struct的对象间接访问实际数据。

struct data_notifier
{
//the true data.
int actual_data;
data_notifier(int data) : actual_data(data)
{
//signal event queue_full       
//or
//call a call_back function.
}
}
int actual_data = 90;
std::queue<data_notifier*> q;
q.push(new data_notifier(actual_data));

现在,唯一的问题是:在data_notifier的实例作为引用/指针正确插入queue之前,我们的call_back或事件将被调用。因此,在调用事件后,读取器将尝试读取数据,但不会从队列中获取数据,因为数据尚未在队列中持久化。因此,只有在函数std::queue::push()返回后,才能保证数据在队列中正确持久化,而这可能发生在Writer函数内部。

//event_full is a manual event which needs to be signalled and non-signalled manually.
void Writer()
{
while(1)
{
//[1] wait for mutex_queue
//[2] myqueue.push(data);
//[3] data is now persisted so signal event_full
//[4] release mutex_queue
}
}
void Reader()
{   
while(1) 
{
//[1] wait for event_full (so no polling)
//[2] wait for mutex_queue
//[3] --- access queue ---
if(myqueue.size() != 0)
{
//process myqueue.front()
//myqueue.pop();
}
if(myqueue.size() == 0)
{
//reset event_full      
//so as long as queue has data, Reader can process it else needs to wait. 
}
//[3] --- access queue ---
//[4] release mutex_queue   
}
}