如何重置标准::condition_variable

how to reset std::condition_variable

本文关键字:variable condition 标准 何重置      更新时间:2023-10-16

我正在尝试使用条件变量来触发第二个线程在一定数量的缓冲区处于双端后执行一些工作。

  • main() 在紧密且时间敏感的循环中读取 ADC 数据
  • main() 将缓冲区添加到 deque(由线程池处理)经理)
  • deque 深度是可变的,因此在 main() 中测试深度>= 4 和然后无法调用notify_one(即:在繁重处理期间次,双面的深度可能多达 200 或 300 个缓冲区或在光处理过程中少则 4 个)
  • thrd() 永远不需要发出 main() 信号

我的问题是,在 thrd() 中,在 4 <获得信号和缓冲区后,它会循环回>

deque<int> vbufs;
std::mutex thrdLock;
void thrd()
{
    while (true) {
        cout << "thrd() waiting for lockn";
        std::unique_lock < std::mutex > lock(thrdLock);
        cout << "thrd() waiting for conditionn";
        cv.wait(lock, []{ return (vbufs.size() > 0); });
        thrdLock.unlock();
        cout << "thrd() condition setn";
        if (vbufs.size() >= 4) {    // pretend to do something with 4 buffers and remove them
            std::lock_guard < std::mutex > lock(thrdLock);
            vbufs.pop_front();
            vbufs.pop_front();
            vbufs.pop_front();
            vbufs.pop_front();
            cout << "thrd() reducing buffers:" << vbufs.size() << endl;
        }
    }
}
int main(int argc, char* argv[]) {
    std::thread t1(thrd);
    int tctr = 0;
    while (true) {
        usleep(1000);
        {
            cout << "main() waiting for lockn";
            std::lock_guard < std::mutex > lock(thrdLock);
            vbufs.push_back(++tctr);
            cout << "main() incremented buffers:" << vbufs.size() << endl;
        }
        cv.notify_one();
    }
    return 0;
}

您无法重置条件变量; 这毫无意义 - 通知它不会更改其状态,因此没有要重置的内容。收到通知时,只能唤醒已在等待的线程。如果该线程随后再次等待,则在重新通知条件变量之前,它不会继续。

如果您只想在有四个或更多缓冲区时工作,您不应该像这样更改等待吗?

cv.wait(lock, []{ return (vbufs.size() >= 4); });

更重要的是,您可能同时从两个不同的线程读取和写入vbufs - 您的if (vbufs.size() >= 4)发生在锁外部,因此可以与对push_back的调用同时发生。这会导致未定义的行为,这可能会解释您所看到的内容。

我在程序中发现了几个错误 - 在这里更正:

注意:您直接解锁互斥锁而不是通过std::unique_lock解锁互斥锁的方式导致了数据竞争。

#include <iostream>
#include <deque>
#include <mutex>
#include <thread>
#include <chrono>
#include <condition_variable>
std::deque<int> vbufs;
std::mutex thrdLock;
std::condition_variable cv;
template<class...Ts>
void emit(Ts&&...ts)
{
    static std::mutex m;
    std::lock_guard<std::mutex> lg(m);
    using expand = int[];
    void(expand { 0, ((std::cout << std::forward<Ts>(ts)), 0)... });
}
void thrd()
{
    while (true) {
        emit("thrd() waiting for lockn");
        std::unique_lock < std::mutex > lock(thrdLock);
        emit("thrd() waiting for conditionn");
        cv.wait(lock, []{ return vbufs.size() >= 4; });
        emit("thrd() condition setn");
        auto a = vbufs.front(); vbufs.pop_front();
        auto b = vbufs.front(); vbufs.pop_front();
        auto c = vbufs.front(); vbufs.pop_front();
        auto d = vbufs.front(); vbufs.pop_front();
        emit("thrd() reducing buffers:", vbufs.size(), 'n');
        lock.unlock();
        emit(a, ' ', b, ' ', c, ' ', d, 'n');
    }
}
int main(int argc, char* argv[]) {
    std::thread t1(thrd);
    int tctr = 0;
    while (true) {
        std::this_thread::sleep_for(std::chrono::microseconds(10));
        {
            emit("main() waiting for lockn");
            std::lock_guard < std::mutex > lock(thrdLock);
            vbufs.push_back(++tctr);
            emit("main() incremented buffers:", vbufs.size(), 'n');
        }
        cv.notify_one();
    }
    t1.join(); // un-necessary in this case, but...
    return 0;
}