std::conditional_variable::notify_all 不会唤醒所有线程

std::conditional_variable::notify_all does not wake up all the threads

本文关键字:唤醒 线程 all conditional variable notify std      更新时间:2023-10-16

我这里有一个简单的例子:

这个项目可以称为学术,因为我尝试学习 c++11 线程。以下是对正在发生的事情的描述。

想象一下,一个非常大的std::string里面有很多汇编源代码,比如

mov ebx,ecx;\r\

nmov eax,ecx;\r....

Parse()函数采用此字符串并通过标记行的开始和结束并将其保存为作业队列中的string::const_iterators来查找所有行位置。

之后,2 个工作线程从队列中弹出此信息,并将子字符串解析为 Intstuction 类对象。他们将指令类的结果实例push_back到std::vector<Instruction> result

这是一个结构声明,用于保存要解析的子字符串的行号和迭代器

struct JobItem {
    int lineNumber;
    string::const_iterator itStart;
    string::const_iterator itEnd;
};

那是一个小伐木工...

void ThreadLog(const char* log) {
    writeMutex.lock();
    cout << "Thr:" << this_thread::get_id() << " " << log << endl;
    writeMutex.unlock();
}

这是共享数据:

queue<JobItem> que;
vector<Instruction> result;

以下是同步的所有原语

condition_variable condVar;
mutex condMutex;
bool signaled = false;
mutex writeMutex;
bool done=false;
mutex resultMutex;
mutex queMutex;

每线程函数

void Func() {
    unique_lock<mutex> condLock(condMutex);
    ThreadLog("Waiting...");
    while (!signaled) {
        condVar.wait(condLock);
    }
    ThreadLog("Started");
    while (!done) {
        JobItem item;
        queMutex.lock();
        if (!que.empty()) {
            item = que.front(); que.pop();
            queMutex.unlock();
        }
        else {
            queMutex.unlock();
            break;
        }
        //if i comment the line below both threads wake up
        auto instr = ParseInstruction(item.itStart, item.itEnd);
        resultMutex.lock();
        result.push_back(Instruction());
        resultMutex.unlock();
    }

管理线程的管理器函数...

vector<Instruction> Parser::Parse(const string& instructionStream){
    thread thread1(Func);
    thread thread2(Func);
    auto it0 = instructionStream.cbegin();
    auto it1 = it0;
    int currentIndex = instructionStream.find("rn");
    int oldIndex = 0;
    this_thread::sleep_for(chrono::milliseconds(1000)); //experimental 

    int x = 0;
    while (currentIndex != string::npos){
        auto it0  = instructionStream.cbegin() + oldIndex;
        auto it1  = instructionStream.cbegin() + currentIndex;
        queMutex.lock();
        que.push({ x,it0,it1 });
        queMutex.unlock();
        if (x == 20) {//fill the buffer a little bit before signal
            signaled = true;
            condVar.notify_all();
        }
        oldIndex = currentIndex + 2;
        currentIndex = instructionStream.find("rn", oldIndex);
        ++x;
    }
    thread1.join();
    thread2.join();
    done = true;
    return result;
}

问题出现在Func()函数中。如您所见,我在里面使用了一些日志记录。日志说:

Output:
Thr:9928 Waiting...
Thr:8532 Waiting...
Thr:8532 Started

这意味着在主线程将notify_all()发送到等待线程后,实际上只有一个线程被唤醒。如果我注释掉对Func()内部ParseInstruction()的调用,那么两个线程都会唤醒,否则只有一个线程在这样做。得到一些建议会很棒。

假设Func读取signaled并看到它是假的。

然后Parse设置为 signaled true 并执行notify_all;此时Func没有等待,所以看不到通知。

然后Func等待条件变量并阻止。

您可以通过在分配给signaled的分配周围放置condMutex锁来避免这种情况。

这是正确使用条件变量的正常模式 - 您需要在同一互斥锁中测试和修改要等待的条件。