等待std::线程中的条件A或条件B

Wait for either condition A or condition B in a std::thread

本文关键字:条件 std 线程 等待      更新时间:2023-10-16

我有一个似乎很简单的需求,但我对使用std::thread还不熟悉,不确定我是否理解正确。

我的线程的工作是运行一个循环:等待对象需要处理,然后处理它,然后等待。。。

在我意识到当线程空闲等待新对象时,它不会注意到已经设置了stopThread标志之前,我正要使用condition_variable来实现这一点。

我实际上想要一种方法来实现wait_for_either(new_data,exit_thread),但不确定如何优雅地实现它。类似队列函数的旧代码使用Windows API WaitForMultipleObjects,但我想借此机会学习C++11的方法。

当您等待数据处理时,实际上是在等待条件变量发出信号。因此,当您想要退出线程时,只需发出条件变量的信号,就像stopThread标志是要处理的特殊数据一样。

代码可能看起来像:

void thread_func()
{
    std::unique_lock<std::mutex> lock(mutex);
    for (;;)
    {
        cond.wait(lock, [] () { return stopThread || !dataContainer.empty(); });
        if (stopThread)
            return; //exit thread
        //process data from dataContainer
    }
}

插入数据:

{
    std::unique_lock<std::mutex> lock(mutex);
    dataContainer.push_back(new_data);
    cond.notify_all();
}

然后,当你想停止线程时:

{
    std::unique_lock<std::mutex> lock(mutex);
    stopThread = true;
    cond.notify_all();
}
thread.join(); //not necessary but probably a good idea

这里有一个强大的数据消耗循环,带有中止选项:

while(true) {
  decltype(dataContainer) data;
  {
    std::unique_lock<std::mutex> lock(mutex);
    cond.wait(lock, [] () { return stopThread || !dataContainer.empty(); });
    if (stopThread)
      return; //exit thread
    data = std::move(dataContainer);
  }
  for (auto&& d:data) {
    if (stopThread) return; // abort
    //process data from d
  }
}

stopThread应该是atomic,或者底部的for(:)环路中的访问需要用mutex来保护。

for(:)循环中对stopThread的访问是可选的;如果没有它,它就不会中止,直到它完成它所拾取的工作包。

CCD_ 12是以CCD_。线程唤醒,获取所有待完成的工作,然后进行处理。

您也可以从dataContainer中弹出一个任务,而不是全部接受。生成的代码稍微简单一点。

要将数据排入dataContainer,您必须锁定mutex,将数据放入,然后通知:

{
  std::unique_lock<std::mutex> lock(mutex);
  dataContainer.push_back(new_data);
}
cond.notify_one();

关闭:

{
  std::unique_lock<std::mutex> lock(mutex);
  stopThread = true;
}
cond.notify_all();

请注意,即使stopThread是原子的,也需要获取互斥。除此之外,还有一个竞赛条件。

半伪代码。

std::atomic_bool programRunning;
std::condition_variable cv;
std::mutex mtx;
std::thread loopThread([&]{
   while(programRunning.load()){
       std::unique_lock<std::mutex> lock(mtx);
        if (newDataAvailable){
            //process new data
        } else{
            cv.wait(lock,[&]{ return dataAvailable || !progamRunning.load(); });
        }
   }
});
{
   std::lock_guard<std::mutex> lock(mtx);
   queueMoreData();
   cv.notify_one();
}
//on exit:
programRunning.store(false);
cv.notify_one();