正确使用 std::condition_variable 触发定时执行

Correct usage of std::condition_variable to trigger timed execution

本文关键字:variable 执行 定时 condition std      更新时间:2023-10-16

我正在尝试以固定的时间间隔执行一段代码。我有一些基于裸pthread的东西,现在我想使用std::thread做同样的事情。

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
bool running;
std::mutex mutex;
std::condition_variable cond;
void timer(){
while(running) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::lock_guard<std::mutex> guard(mutex);
cond.notify_one();
}
cond.notify_one();
}
void worker(){
while(running){
std::unique_lock<std::mutex> mlock(mutex);
cond.wait(mlock);
std::cout << "Hello World" << std::endl;
//... do something that takes a variable amount of time ...//
}
}
int main(){
running = true;
auto t_work = std::thread(worker);
auto t_time = std::thread(timer);
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
running = false;
t_time.join();
t_work.join();
}

实际上,worker会执行一些需要可变时间的事情,但应该以固定的间隔进行安排。它似乎有效,但我对此很陌生,所以有些事情我不清楚......

  • 为什么我需要mutex?我并没有真正使用条件,但是每当timer发出信号时,工人都应该完成它的工作。

  • 循环后timer真的需要再次调用cond.notify_one()吗?这是从旧代码中获取的,iirc 的原因是防止worker永远等待,以防timerworker仍在等待时完成。

  • 我是否需要running标志,或者是否有更好的方法来break出循环?

PS:我知道还有其他方法可以确保固定的时间间隔,并且我知道我目前的方法存在一些问题(例如,如果worker需要的时间超过timer使用的时间间隔)。但是,我想先理解这段代码,然后再对其进行过多更改。

为什么我需要互斥锁?我并没有真正使用条件,但是每当计时器发送信号时,工人都应该完成其工作。

您需要互斥锁的原因是,等待满足条件的线程可能会受到虚假唤醒的影响。为了确保您的线程实际收到条件正确满足的通知,您需要检查这一点,并且应该在等待调用中使用 lambda 进行检查。为了保证变量在虚假唤醒之后但在检查变量之前不会被修改,您需要获取互斥锁,以便您的线程是唯一可以修改条件的线程。在您的情况下,这意味着您需要为工作线程添加一种方法来实际验证计时器是否确实用完了。

计时器真的需要在循环后再次调用 cond.notify_one() 吗?这是从旧代码中获取的,iirc的原因是防止工作线程永远等待,以防计时器在工作线程仍在等待时完成。

如果在循环后不调用通知,工作线程将无限期等待。因此,要干净地退出程序,您实际上应该调用 notify_all() 以确保等待条件变量的每个线程都唤醒并且可以干净地终止。

我是否需要运行标志,或者是否有更好的方法来打破循环?

运行标志是实现您想要的最干净的方式。

让我们首先检查一下背景概念。

关键部分

首先,需要互斥体来相互排除对关键部分的访问。通常,关键部分被视为共享资源。例如队列、一些 I/O(例如套接字)等。简而言之,互斥用于保护共享资源免受争用条件的影响,这可能会使资源进入未定义状态。

示例:生产者/消费者问题

队列应包含一些要完成的工作项。可能有多个线程将一些工作项放入队列(即生成项 => 生产者线程),以及多个线程使用这些项并执行 smth。对它们有用(=> 使用者线程)。

放置和使用操作修改队列(尤其是其存储和内部表示形式)。因此,当运行放置或消费操作时,我们希望排除其他操作。这就是互斥体发挥作用的地方。在一个非常基本的星座中,只有一个线程(无论是生产者还是消费者)可以访问互斥体,即锁定它。存在一些其他更高级别锁定原语,以根据使用场景(例如 ReaderWriter 锁定)增加吞吐量

条件变量的概念

condition_variable::notify_one唤醒一个当前等待的线程。至少有一个线程必须等待此变量:

  • 如果没有线程正在等待此变量,则发布的事件将丢失。
  • 如果有一个等待线程,它将唤醒并开始运行,只要它可以锁定与条件变量关联的互斥锁。因此,如果发起notify_onenotify_all调用的线程没有放弃互斥锁(例如mutex::unlock()condition_variable::wait()) 唤醒的线程将不会运行。

timer()线程互斥锁在调用后解锁notify_one()因为作用域结束并且guard对象被销毁(析构函数调用隐式mutex::unlock())

这种方法的问题

取消和变量缓存

允许编译器缓存变量的值。因此,将running设置为true可能不起作用,因为可能会缓存变量的值。为避免这种情况,您需要将running声明为volatilestd::atomic<bool>.

worker线程

您指出worker需要在某些时间间隔内运行,并且可能会运行不同的时间量。timer线程只能在线程完成后运行worker。为什么在这一点上需要另一个线程来测量时间?这两个线程始终作为一个线性块运行,并且没有关键部分!为什么不直接在任务执行后进行所需的sleep调用,并在时间流逝后立即开始运行?事实证明,只有std::cout是共享资源。但目前它是从一个线程使用的。否则,您需要一个互斥锁(没有条件变量)来保护仅对cout的写入。

#include <thread>
#include <atomic>
#include <iostream>
#include <chrono>
std::atomic_bool running = false;
void worker(){
while(running){
auto start_point = std::chrono::system_clock::now();
std::cout << "Hello World" << std::endl;
//... do something that takes a variable amount of time ...//
std::this_thread::sleep_until(start_point+std::chrono::milliseconds(1000));
}
}
int main(){
running = true;
auto t_work = std::thread(worker);
std::this_thread::sleep_for(std::chrono::milliseconds(10000));
running = false;
t_work.join();
}

注意:在工作线程中调用sleep_until,如果任务从start_point阻塞超过 1000 毫秒,则会阻止执行。