避免忙于等待,并在实时和非实时线程之间切换模式

avoid busy waiting and mode switches between realtime and non realtime threading

本文关键字:实时 线程 之间 模式 于等待 等待      更新时间:2023-10-16

我有以下问题:我们确实有一个控制器,该控制器在实时Xenomai Linux补丁系统上运行ros_control。控制循环通过迭代调用更新函数来执行。我需要传达控制器的一些内部状态,对于这项任务,我使用的是麻省理工学院开发的LCM。无论 LCM 的内部行为如何,发布方法都会破坏实时,因此我在 C++11 中实现了在分离线程上运行的发布循环。但是如果我不将辅助线程与控制器同步,它将以无限频率发布的循环。因此,我也在使用条件变量。

下面是控制器的示例:

MyClass mc;
// This is called just once
void init(){
    mc.init();
}
// Control loop function (e.g., called every 5 ms in RT)
void update(const ros::Time& time, const ros::Duration& period) {
    double value = time.toSec();
    mc.setValue(value);
}

对于尝试发布的类:

double myvalue;
std::mutex mutex;
std::condition_variable cond;
bool go = true;
void MyClass::init(){
    std::thread thread(&MyClass::body, this);
}

void MyClass::setValue(double value){
    myvalue = value;
    {
    std::lock_guard<std::mutex> lk(mutex);
    go = true;
    }
    cond.notify_one();
} 
void MyClass::body() {
    while(true) {
        std::unique_lock<std::mutex>lk(mutex);
        cond.wait(lk, [this] {return go;});    
        publish(myvalue); // the dangerous call
        go = false;
        lk.unlock();
    }
 }

此代码生成模式开关(即,正在中断实时(。可能是因为条件变量上的锁,我用它来将辅助线程与主控制器同步并与线程争用。如果我做这样的事情:

void MyClass::body() {
    while(true) {
        if(go){
        publish(myvalue);
        go = false;            
        }
    }
 }
 void MyClass::setValue(double value){
    myvalue = value;
    go = true;        
} 

我不会生产模式开关,但它也是不安全的,最重要的是我会忙于等待辅助线程。

有没有办法在主线程和辅助线程之间进行非阻塞同步(即,只有在调用setValue时才body做某事(,这也是非忙于等待的?

使用无锁数据结构。

在您的情况下,您甚至不需要数据结构,只需使用原子进行go即可。无需锁。您也可以考虑使用信号量而不是条件变量来避免现在未使用的锁。如果您需要信号量来避免使用锁,那么您最终将使用基本操作系统信号量,而不是 C++11,因为 C++11 甚至没有它们。

这并不完美,但它应该会减少您的忙碌等待频率,只是偶尔会失去响应能力。

这个想法是在通过原子传递消息时使用裸条件变量唤醒。

template<class T>
struct non_blocking_poke {
  std::atomic<T> message;
  std::atomic<bool> active;
  std::mutex m;
  std::condition_variable v;
  void poke(T t) {
    message = t;
    active = true;
    v.notify_one();
  }
  template<class Rep, class Period>
  T wait_for_poke(const std::chrono::duration<Rep, Period>& busy_time) {
    std::unique_lock<std::mutex> l(m);
    while( !v.wait_for(l, busy_time, [&]{ return active; } ))
    {}
    active = false;
    return message;
  }
};

等待线程每busy_time唤醒一次,以查看它是否错过了消息。 但是,它通常会更快地获得消息(存在它错过消息的竞争条件(。 此外,可以在没有缓解器获取的情况下发送多条消息。 但是,如果发送了消息,则在大约 1 秒内,接收方将收到该消息或稍后的消息

non_blocking_poke<double> poker;
// in realtime thread:
poker.poke(3.14);
// in non-realtime thread:
while(true) {
  using namespace std::literals::chrono_literals;
  double d = poker.wait_for_poke( 1s );
  std::cout << d << 'n';
}

在工业质量解决方案中,您还需要中止标志或消息来停止循环。