同步两个具有不同帧速率的传感器

Synchronize two sensors with different frame rate

本文关键字:帧速率 传感器 两个 同步      更新时间:2023-10-16

我想使用线程同步两个以不同帧速率(~80ms与~40ms(工作C++传感器的输出。这个想法就像生产者 - 消费者问题,但有2个生产者和1个消费者,并且没有缓冲,因为只有最后的新产品才重要。

这些是涵盖问题的要点:

  1. 每个传感器读数将由一个线程单独管理。
  2. 将有一个主线程,该线程必须始终获取从传感器读取的最后两个新数据并对其进行处理。
  3. 一个传感器的读数
  4. 不应阻挡另一个传感器的读数。我的意思是,线程读取不应该具有相同的互斥锁。
  5. 主/进程线程在工作时不应阻塞读取线程。我建议锁定数据,制作本地副本(它比直接处理更快(,解锁并处理副本。
  6. 如果没有新数据,主线程应等待它。

这是所请求功能的时间图。

这是伪代码:

void getSensor1(Data& data)
{
while (true)
{
mutex1.lock();
//Read data from sensor 1
mutex1.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(80 + (rand() % 5)));
}
}
void getSensor2(Data& data)
{
while (true)
{
mutex2.lock();
//Read data from sensor 2
mutex2.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(40 + (rand() % 5)));
}
}
int main()
{
Data sensor1;
Data sensor2;
std::thread threadGetScan(getSensor1, std::ref(sensor1));
std::thread threadGetFrame(getSensor2, std::ref(sensor2));
while(true)
{
// Wait for new data, lock, copy, unlock and process it
std::this_thread::sleep_for(std::chrono::milliseconds(100 + (rand() % 25)))
}
return 0;
}

提前谢谢。

由于每个传感器仅从一个线程读取,因此传感器访问周围的互斥锁没有任何用处。 你可以摆脱它。 需要线程安全的地方是从传感器读取数据的线程将数据传递给使用数据的线程的方法。

让从传感器读取的线程仅使用局部变量或仅由该线程访问的变量来读取传感器。 一旦它完全拥有数据,然后将该数据(或者更好的是,指向数据的指针(放入共享队列中,使用线程将从中获取它。

由于您只需要保存最新数据,因此队列的最大大小可以为 1。 这可以只是一个指针。

对此共享数据结构的访问应使用互斥锁进行保护。 但由于它只是一个指针,你可以使用 std::atomic。

读取线程可能如下所示:

void getData(std::atomic<Data*>& dataptr) {
while (true) {
Data* mydata = new Data;  // local variable!
// stuff to put data into mydata
std::this_thread::sleep_for(80ms);
// Important! this line is only once that uses dataptr.  It is atomic.
Data* olddata = std::atomic_exchange(&dataptr, mydata);
// In case the old data was never consumed, don't leak it.
if (olddata) delete olddata;
}
}

主线程可能如下所示:

void main_thread(void) {
std::atomic<Data*> sensorData1;
std::atomic<Data*> sensorData2;
std::thread sensorThread1(getData, std::ref(sensorData1));
std::thread sensorThread2(getData, std::ref(sensorData2)); 
while (true) {
std::this_thread::sleep_for(100ms);
Data* data1 = std::atomic_exchange(&sensorData1, (Data*)nullptr);
Data* data2 = std::atomic_exchange(&sensorData2, (Data*)nullptr);
// Use data1 and data2
delete data1;
delete data2;
}
}

经过一些研究工作,我找到了一个解决方案,可以使用互斥体和条件变量来做我想做的事。我让你在我建议的代码下面。改进和其他合适的解决方案仍然被接受。

#include <iostream> 
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <cstdlib>
#define SIZE_LOOP 1000
// Struct where the data sensors is synchronized
struct Data
{
int data1; // Data of sensor 1
int data2; // Data of sensor 2
};
std::mutex mtx1;              // Mutex to access sensor1 shared data
std::mutex mtx2;              // Mutex to access sensor2 shared data
std::condition_variable cv1;  // Condition variable to wait for sensor1 data availability
std::condition_variable cv2;  // Condition variable to wait for sensor2 data availability
bool ready1;                  // Flag to indicate sensor1 data is available
bool ready2;                  // Flag to indicate sensor2 is available
// Function that continuously reads data from sensor 1
void getSensor1(int& data1)
{
// Initialize flag to data not ready
ready1 = false;
// Initial delay
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
// Reading loop (i represents an incoming new data)
for(int i = 0; i < SIZE_LOOP; i++)
{
// Lock data access
std::unique_lock<std::mutex> lck1(mtx1);
// Read data
data1 = i;
std::cout << "Sensor1 (" << data1 << ")"<< std::endl;
// Set data to ready
ready1 = true;
// Notify if processing thread is waiting
cv1.notify_one();
// Unlock data access
lck1.unlock();
// Sleep to simulate frame rate
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + (rand() % 500)));
}
}
// Function that continuously reads data from sensor 2
void getSensor2(int& data2)
{
// Initialize flag to data not ready
ready2 = false;
// Initial delay
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
// Reading loop (i represents an incoming new data)
for(int i = 0; i < SIZE_LOOP; i++)
{
// Lock data access
std::unique_lock<std::mutex> lck2(mtx2);
// Read data
data2 = i;
std::cout << "Sensor2 (" << data2 << ")"<< std::endl;
// Set data to ready
ready2 = true;
// Notify if processing thread is waiting
cv2.notify_one();
// Unlock data access
lck2.unlock();
// Sleep to simulate frame rate
std::this_thread::sleep_for(std::chrono::milliseconds(1000 + (rand() % 500)));
}
}
// Function that waits until sensor 1 data is ready
void waitSensor1(const int& dataRead1, int& dataProc1)
{
// Lock data access
std::unique_lock<std::mutex> lck1(mtx1);
// Wait for new data
while(!ready1)
{
//std::cout << "Waiting sensor1" << std::endl;
cv1.wait(lck1);
}
//std::cout << "No Waiting sensor1" << std::endl;
// Make a local copy of the data (allows uncoupling read and processing tasks what means them can be done parallely)
dataProc1 = dataRead1;
std::cout << "Copying sensor1 (" << dataProc1 << ")"<< std::endl;
// Sleep to simulate copying load
std::this_thread::sleep_for(std::chrono::milliseconds(200));
// Set data flag to not ready
ready1 = false;
// Unlock data access
lck1.unlock();
}
// Function that waits until sensor 2 data is ready
void waitSensor2(const int& dataRead2, int& dataProc2)
{
// Lock data access
std::unique_lock<std::mutex> lck2(mtx2);
// Wait for new data
while(!ready2)
{
//std::cout << "Waiting sensor2" << std::endl;
cv2.wait(lck2);
}
//std::cout << "No Waiting sensor2" << std::endl;
// Make a local copy of the data (allows uncoupling read and processing tasks what means them can be done parallely)
dataProc2 = dataRead2;
std::cout << "Copying sensor2 (" << dataProc2 << ")"<< std::endl;
// Sleep to simulate copying load
std::this_thread::sleep_for(std::chrono::milliseconds(400));
// Set data flag to not ready
ready2 = false;
// Unlock data access
lck2.unlock();
}
// Main function
int main()
{
Data dataRead;  // Data read
Data dataProc;  // Data to process
// Threads that reads at some frame rate data from sensor 1 and 2
std::thread threadGetSensor1(getSensor1, std::ref(dataRead.data1));
std::thread threadGetSensor2(getSensor2, std::ref(dataRead.data2));
// Processing loop
for(int i = 0; i < SIZE_LOOP; i++)
{
// Wait until data from sensor 1 and 2 is ready
std::thread threadWaitSensor1(waitSensor1, std::ref(dataRead.data1), std::ref(dataProc.data1));
std::thread threadWaitSensor2(waitSensor2, std::ref(dataRead.data2), std::ref(dataProc.data2));
// Shyncronize data/threads
threadWaitSensor1.join();
threadWaitSensor2.join();
// Process synchronized data while sensors are throwing new data
std::cout << "Init processing (" << dataProc.data1 << "," << dataProc.data2 << ")"<< std::endl;
// Sleep to simulate processing load
std::this_thread::sleep_for(std::chrono::milliseconds(10000 + (rand() % 1000)));
std::cout << "End processing" << std::endl;
}
return 0;
}