关于标准::condition_variables的两个问题

Two questions on std::condition_variables

本文关键字:问题 两个 variables condition 于标准 标准      更新时间:2023-10-16

我一直在试图弄清楚std::condition_variables,我对wait()以及是使用notify_all还是notify_one感到特别困惑。

首先,我编写了一些代码并将其附加到下面。下面是一个简短的解释:Collection是一个保存一堆Counter对象的类。这些Counter对象有一个Counter::increment()方法,需要一遍又一遍地在所有对象上调用该方法。为了加快速度,Collection还维护了一个线程池来分配工作,并使用其Collection::increment_all()方法发送所有工作。

这些线程不需要相互通信,并且通常Counter对象比线程多得多。如果一个线程比其他线程处理超过Counter秒,只要完成所有工作即可。向队列添加工作很容易,只需要在"主"线程中完成。据我所知,唯一可能发生的坏事是如果其他方法(例如Collection::printCounts(允许在完成工作的过程中在柜台上被调用。

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <queue>

class Counter{
private:
int m_count;
public:
Counter() : m_count(0) {}
void increment() { 
m_count ++; 
}
int getCount() const { return m_count; }
};

class Collection{
public:
Collection(unsigned num_threads, unsigned num_counters) 
: m_shutdown(false)
{
// start workers
for(size_t i = 0; i < num_threads; ++i){
m_threads.push_back(std::thread(&Collection::work, this)); 
}
// intsntiate counters
for(size_t j = 0; j < num_counters; ++j){
m_counters.emplace_back();
}
}
~Collection() 
{ 
m_shutdown = true;
for(auto& t : m_threads){
if(t.joinable()){
t.join();
}
}
}
void printCounts() {
// wait for work to be done
std::unique_lock<std::mutex> lk(m_mtx);
m_work_complete.wait(lk); // q2: do I need a while lop?  
// print all current counters
for(const auto& cntr : m_counters){
std::cout << cntr.getCount() << ", ";
}
std::cout << "n";
}
void increment_all() 
{
std::unique_lock<std::mutex> lock(m_mtx);
m_work_complete.wait(lock);
for(size_t i = 0; i < m_counters.size(); ++i){
m_which_counters_have_work.push(i);
}
}

private:    
void work()
{
while(!m_shutdown){
bool action = false;
unsigned which_counter;
{
std::unique_lock<std::mutex> lock(m_mtx);
if(m_which_counters_have_work.size()){
which_counter = m_which_counters_have_work.front();
m_which_counters_have_work.pop();
action = true;
}else{
m_work_complete.notify_one(); // q1: notify_all
}
}
if(action){
m_counters[which_counter].increment();
}
}   
}

std::vector<Counter> m_counters;
std::vector<std::thread> m_threads;
std::condition_variable m_work_complete;
std::mutex m_mtx;
std::queue<unsigned> m_which_counters_have_work;
bool m_shutdown;
};
int main() {
int num_threads = std::thread::hardware_concurrency()-1;
int num_counters = 10;
Collection myCollection(num_threads, num_counters);
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
myCollection.increment_all();
myCollection.printCounts();
return 0;
}

我在 Ubuntu 18.04 上编译了这个g++ -std=c++17 -pthread thread_pool.cpp -o tp && ./tp我认为代码实现了所有这些目标,但仍然存在一些问题:

  1. 我正在使用m_work_complete.wait(lk)来确保在开始打印所有新计数之前完成工作。为什么我有时会看到它写在while循环中,或者将第二个参数作为 lambda 谓词函数?这些文档提到了虚假唤醒。如果发生虚假唤醒,这是否意味着printCounts可以过早打印?如果是这样,我不希望这样。我只想在开始使用应该存在的数字之前确保工作队列是空的。

  2. 我正在使用m_work_complete.notify_all而不是m_work_complete.notify_one.我已经阅读了这个线程,我认为这并不重要 - 只有主线程会被这个阻塞。使用notify_one是否更快,以便其他线程不必担心它?

std::condition_variable并不是
  1. 一个真正的条件变量,它更像是一个用于达到某个条件的同步工具。该条件是什么取决于程序员,并且仍然应该在每次唤醒后检查condition_variable,因为当尚未达到所需条件时,它可能会虚假或"过早"唤醒。

    在 POSIX 系统上,condition_variable::wait()委托给pthread_cond_wait,这容易受到虚假唤醒的影响(请参阅基本原理部分中的"条件等待语义"(。在Linux上,pthread_cond_wait又通过futex来实现,这同样容易受到杂散唤醒的影响。

    所以是的,你仍然需要一个标志(受相同的互斥锁保护(或其他方法来检查工作是否真正完成。执行此操作的一种便捷方法是将检查包装在谓词中并将其传递给wait()函数,该函数将为您循环,直到满足谓词。

  2. notify_all取消阻止所有等待条件变量的线程;notify_one只解锁一个(或者至少一个,准确地说(。如果有多个等待线程,并且它们是等效的,即任何一个线程都可以完全处理条件,并且如果条件足以让一个线程继续(如将工作单元提交到线程池(,那么notify_one会更有效,因为它不会不必要地解锁其他线程,因为它们只会注意到没有工作要做并返回等待。如果你只有一个服务员,那么notify_onenotify_all之间就没有区别了。

这很简单:何时使用notify();

  1. 没有理由需要多个线程了解该事件。(例如,使用notify()宣布工作线程将"使用"的项目的可用性,从而使该项目对其他工作线程不可用(
    *和*
  2. 没有可以唤醒的错误线程。(例如,如果所有线程都wait()同一行的相同函数中,则可能是安全的。

在所有其他情况下使用notify_all()