C++11:除非发布者休眠,否则发布者/使用者模式不会完成
C++11: Publisher/Consumer pattern does not finish unless publisher sleeps
在C++中,我试图使用condition_variable
获取发布者/消费者模式的句柄。这是我在网上看到的模板:
#include <iostream>
#include <thread>
#include <string>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <chrono>
using namespace std;
mutex m;
queue<string> que;
condition_variable cond;
void write(string &&msg) {
unique_lock<mutex> locker(m);
que.push(msg);
locker.unlock();
cond.notify_one();
this_thread::sleep_for(chrono::milliseconds(1));
}
void read() {
while (true) {
unique_lock<mutex> locker(m);
cond.wait(locker);
if (!que.empty()) {
cout << que.front() << endl;
que.pop();
}
locker.unlock();
}
}
void publisher(string &msg) {
for (int i = 0; i < 100; ++i)
write("Publisher: " + msg + ", " + to_string(i));
}
int main() {
string msg = "Hello";
thread pub_thread(publisher, std::ref(msg));
/* The main thread will print the publisher's messages to cout. */
read();
/* Make the main thread wait for the publisher to finish. */
pub_thread.join(); //
return 0;
}
我不明白的是发布者线程上的sleep_for
调用。我知道这只是为了模拟一个"现实生活"场景,在这种情况下,出版商不会这么快吐出消息。但是,奇怪的是,如果我注释掉该行,代码将无法完成。为什么?
此外,我尝试将sleep_for时间设置为 0 并具有相同的效果。似乎出版的根本需要睡觉,但我不明白为什么。为了更具体,代码应该打印出 100 条消息。如果我让代码休眠 1 毫秒,则打印所有 100 条消息。如果我不这样做,那么在代码冻结之前我只看到大约 10 条消息。似乎出现了僵局。
如果有更好的模式可以避免让发布者入睡,则加分......
我知道在实践中你需要有一个停止主线程的策略,就像毒丸一样。我故意省略这一点,以便集中讨论本讨论。
编辑
嗯。如果我放入一个块来处理虚假唤醒,那么就可以解决问题。但这仍然不能解释为什么原始代码失败。
下面是一个改进的读取功能:
void read() {
while (true) {
unique_lock<mutex> locker(m);
cond.wait(locker, [&](){ return !que.empty(); });
cout << que.front() << endl;
que.pop();
locker.unlock();
}
}
您需要处理发布者发布速度快于使用者使用速度的情况。
发生这种情况时,消费者将错过condition_variable
触发器。请记住,notify
呼叫不会累积。
将使用者更改为在唤醒后使用所有可用消息:
if (!que.empty())
→while (!que.empty())
喜欢这个:
void read() {
while (true) {
unique_lock<mutex> locker(m);
cond.wait(locker);
while (!que.empty()) {
cout << que.front() << endl;
que.pop();
}
locker.unlock();
}
}
你有两个问题:一个是发布者没有义务屈服于读者 - 或者暂停足够长的时间让读者成功锁定互斥锁 - 除非你做到了。
第二个是你的读者无论如何都是不正确的:
void read() {
while (true) {
unique_lock<mutex> locker(m);
cond.wait(locker);
if (!que.empty()) {
cout << que.front() << endl;
que.pop();
}
}
}
这假设每个推送到队列的元素都有一个"唤醒事件",因为它每次唤醒只消耗一个元素。但这不是条件变量的工作方式。
这个序列完全有可能发生:
- 发布者添加项目 1
- 发布者信号 condvar
- 发布者添加项目 2
- 发布者信号 condvar
- 读者从康德瓦尔等待中唤醒
- 读者消费项目 1
在这种情况下,发布者将添加 100 个项目,并发出 100 次信号,但读取器只会唤醒 99 次,因此最多使用 99 个项目。
正确的代码应该是这样的:
void read() {
unique_lock<mutex> locker(m);
while (true) {
// don't wait if we don't have to
while (que.empty()) {
cond.wait(locker);
}
// consume everything we can
while (!que.empty()) {
cout << que.front() << endl;
que.pop();
}
}
}
使用谓词可以实现大致相同的目标(为了清楚起见,我只是明确地写出了所有逻辑( - 第二个while
在您的编辑中不存在,但循环和跳过第一个是获得相同行为的稍微昂贵的方法。
此外,无需在每次迭代时都捶打互斥锁 - 条件变量已经根据需要(取消(锁定了它。
- 生产者使用者在 cpp 中使用互斥锁的问题
- 用于免等待生产者和阻塞使用者的环形缓冲区
- C++ 多个使用者线程卡在条件变量上
- 在使用者循环中重用unique_lock
- C++11:除非发布者休眠,否则发布者/使用者模式不会完成
- ActiveMQ使用者的内存使用量在onMessage调用后不断增加
- 您将如何在现有的QT Creator GUI项目中实现ROS发布者和订阅者?
- 具有单个生产者单一使用者的无锁循环缓冲区
- 多线程使用者未产生与使用升压环形缓冲区的输入相同的输出
- 在何处查看发布者发送的输出数据
- 在 ZeroMQ 中绑定订阅者套接字并连接发布者套接字会在代码运行时给出错误.为什么
- 如何使用两个发布者的同一端口来使用 ZeroMQ 向同一订阅者发送数据?
- C++生产者使用者中,同一使用者线程会抓取所有任务
- ZeroMQ 中的多个发布者 C++ 这是一个不错的选择吗?
- 如何将第二个使用者添加到基于 pthread 的生产者-使用者设置中?
- 生产者/使用者,如何确保在关闭所有使用者之前耗尽线程安全队列
- 交换缓冲区而不锁定单个生产者和多个使用者
- C++11 使用 std::atomic(多写入器,单使用者)的无锁队列
- C++ 静态数据字段导致的使用者文件中的链接错误
- 订阅者和发布者在 ROS 中的一个文件中