填充和保存线程之间的共享缓冲区

Filling and saving shared buffer between threads

本文关键字:共享 缓冲区 之间 线程 保存 填充      更新时间:2023-10-16

我正在使用一个检索 I/Q 数据的 API。调用函数bbGetIQ(m_handle, &pkt);填充缓冲区。这是一个线程循环,而用户尚未输入"stop"。Pkt 是一种结构,使用的缓冲区是pkt.iqData = &m_buffer[0];,它是浮点数的向量。向量的大小为 5000,每次我们循环时,缓冲区都会填充 5000 个值。

我想将数据从缓冲区保存到文件中,我在调用bbgetIQ后立即执行此操作,但这样做是一项耗时的任务,数据检索速度不够快,导致 API 丢弃数据,以便它可以继续填充其缓冲区。

这是我的代码的样子:


void Acquisition::recordIQ(){
int cpt = 0;
ofstream myfile;

while(1){
while (keep_running)
{   
cpt++;
if(cpt < 2)
myfile.open ("/media/ssd/IQ_Data.txt");

bbGetIQ(m_handle, &pkt); //Retrieve I/Q data

//Writing content of buffer into the file.
for(int i=0; i<m_buffer.size(); i++)
myfile << m_buffer[i] << endl;

}
cpt = 0;
myfile.close();
}
}

然后我尝试仅在我们离开循环时写入文件:


void Acquisition::recordIQ(){
int cpt = 0;
ofstream myfile;
int next=0;
vector<float> data;

while(1){
while ( keep_running)
{   
if(keep_running == false){
myfile.open ("/media/ssd/IQ_Data.txt");
for(int i=0; i<data.size(); i++)
myfile << data[i] << endl;
myfile.close();
break;
}
cpt++;
data.resize(next + m_buffer.size());
bbGetIQ(m_handle, &pkt); //retrieve data
std::copy(m_buffer.begin(), m_buffer.end(), data.begin() + next); //copy content of the buffer into final vector
next += m_buffer.size(); //next index
}
cpt = 0;
}
}

我不再从 API 获得数据丢失,但问题是我受到data向量大小的限制。例如,我不能让它整夜检索数据。

我的想法是制作 2 个线程。一个将检索数据,另一个将数据写入文件。这两个线程将共享一个循环缓冲区,其中第一个线程将填充缓冲区,第二个线程将读取缓冲区并将内容写入文件。由于它是一个共享缓冲区,我想我应该使用互斥体。

我是多线程和互斥锁的新手,所以这是一个好主意吗?我真的不知道从哪里开始,以及消费者线程如何在生产者填充缓冲区时读取缓冲区。读取时锁定缓冲区会导致 API 丢弃数据吗?(因为它无法将其写入循环缓冲区)。

编辑:由于我希望我的记录线程在后台运行,以便我可以在录制时执行其他操作,因此我将其分离,用户可以通过将条件keep_running设置为 true 来启动记录。


thread t1(&Acquisition::recordIQ, &acq);
t1.detach();

你需要使用这样的东西(https://en.cppreference.com/w/cpp/thread/condition_variable):

全局:

std::mutex m;
std::condition_variable cv;
std::vector<std::vector<float>> datas;
bool keep_running = true, start_running = false;

编写线程:

void writing_thread()
{
myfile.open ("/media/ssd/IQ_Data.txt");
while(1) {
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return keep_running && !datas.empty();});
if (!keep_running) break;
auto d = std::move(datas); 
lk.unlock();
for(auto &entry : d) {
for(auto &e : entry)
myfile << e << endl;             
}
}
}

发送线程:

void sending_thread() {
while(1) {
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return keep_running && start_running;});
if (!keep_running) break;
}
bbGetIQ(m_handle, &pkt); //retrieve data
std::vector<float> d = m_buffer;
{
std::lock_guard<std::mutex> lk(m);
if (!keep_running) break;
datas.push_back(std::move(d));
}
cv.notify_one();
}
}
void start() {
{
std::unique_lock<std::mutex> lk(m);
start_running = true;
}
cv.notify_all();
}
void stop() {
{
std::unique_lock<std::mutex> lk(m);
start_running = false;
}
cv.notify_all();
}
void terminate() {
{
std::unique_lock<std::mutex> lk(m);
keep_running = false;
}
cv.notify_all();
thread1.join();
thread2.join();
}

总之: 发送线程从任何内容接收数据,锁定互斥mt并将数据移动到datas存储。然后它使用条件变量cv通知等待线程有事情要做。写入线程等待条件变量发出信号,然后锁定互斥锁mt,将数据全局变量移动到局部datas然后释放互斥锁并继续将刚刚接收的数据写入文件。关键是将互斥锁定时间尽可能短。

编辑: 要终止整个事情,您需要将keep_running设置为 false。然后打电话给cv.notify_all()然后加入所涉及的线程。顺序很重要。您需要联接线程,因为写入线程可能仍在写入数据的过程中。

编辑2: 添加了延迟启动。现在创建一个线程,在一个运行sending_thread,在另一个writing_thread。调用start()以启用处理,stop()以停止处理。