C++ 多个使用者线程卡在条件变量上
C++ Multiple consumer threads stuck on condition variable
我正在C++中制作一个生产者,多个消费者程序。我首先调用使用者线程,然后将元素添加到数组中。 一切正常,但最终使用者线程没有加入,因为它们被困在等待条件变量并且程序冻结。
我认为问题是线程在循环中不断被调用,因为 currentSize 不受保护,它们无法退出条件变量,但我不知道如何解决它。
struct Item {
public:
string name;
int time;
double height;
};
struct Monitor {
private:
Item items[12];
int currentSize;
bool finished;
mutex lock;
condition_variable cv;
public:
Monitor() {
finished = false;
currentSize = 0;
}
void put(Item item) {
unique_lock<mutex> guard(lock);
cv.wait(guard, [&] { return (currentSize < 12); });
items[currentSize] = item;
currentSize++;
cv.notify_all();
}
Item get() {
unique_lock<mutex> guard(lock);
cv.wait(guard, [&] { return (currentSize > 0); });
Item item = items[currentSize - 1];
currentSize--;
return item;
}
bool get_finished() {
return finished;
}
void set_finished() {
finished = true;
}
int get_size() {
return currentSize;
}
};
int main() {
vector<Item> items = read_file(file);
Monitor monitor;
vector<thread> threads;
vector<Item> results;
for (int i = 0; i < 4; i++) {
threads.emplace_back([&] {
while (!monitor.get_finished()) {
if (monitor.get_size() > 0) {
Item item = monitor.get();
results.push_back(item);
}
}
});
}
for (int i = 0; i < items.size(); i++) {
monitor.put(items[i]);
}
monitor.set_finished();
for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
return 0;
}
为什么使用者线程阻塞?
我已经测试了您的代码,结果证明它是put()
方法上的生产者线程阻塞。为什么?
想象一下以下场景:向量items
中有 13 个项目。
主线程(生产者(愉快地加载前 12 个项目,并等待cv
currentSize
低于 12。
使用者线程收到通知,并愉快地使用前 12 个项目,然后等待cv
currentSize
大于 0。
但是等等!现在每个人都在等待一些事情,没有人通知。因此,所有线程都会阻塞。您需要在currentSize
低于 12 时通知生产者。
我注意到了一些问题。 使成员变量原子化,notify_all获取 API 中。但是,也存在逻辑错误。假设您当前有 4 个线程正在运行,5 个项目在队列中。在这一点上,假设每个线程都能够从队列中取出一个线程,现在队列中有 4 个线程,只有一个项目。其中一个线程取出最后一个,现在那里有 0 个项目,但其他三个线程仍在等待条件变量。因此,一个解决方案是,如果最后一项已出,则应通知每个线程,如果没有其他elemnet,则应从API返回。
#include <iostream>
#include <vector>
#include <condition_variable>
#include <thread>
#include <algorithm>
#include <atomic>
using namespace std;
using Item = int;
struct Monitor {
private:
Item items[12];
std::atomic<int> currentSize;
std::atomic<bool> finished;
mutex lock;
condition_variable cv;
public:
Monitor() {
finished = false;
currentSize = 0;
}
void put(Item item) {
unique_lock<mutex> guard(lock);
cv.wait(guard, [&] { return (currentSize < 12); });
items[currentSize] = item;
currentSize++;
cv.notify_all();
std::cerr << "+ " << currentSize << std::endl ;
}
Item get() {
unique_lock<mutex> guard(lock);
cv.wait(guard, [&] { return (currentSize >= 0 ); });
Item item;
if (currentSize > 0 ){
currentSize--;
item = items[currentSize];
cv.notify_all();
std::cerr << "- " << currentSize << std::endl ;
}
return item;
}
bool get_finished() {
return finished;
}
void set_finished() {
finished = true;
}
int get_size() {
return currentSize;
}
};
int main() {
vector<Item> items(200);
std::fill ( items.begin() , items.end(), 100);
Monitor monitor;
vector<thread> threads;
vector<Item> results;
for (int i = 0; i < 10; i++) {
threads.emplace_back([&] {
while ( !monitor.get_finished() ) {
if (monitor.get_size() > 0) {
Item item = monitor.get();
results.push_back(item);
}
}
});
}
for (int i = 0; i < items.size(); i++) {
monitor.put(items[i]);
}
monitor.set_finished();
for_each(threads.begin(), threads.end(), mem_fn(&thread::join));
return 0;
}
相关文章:
- 基于模板值的条件变量
- 没有超时的C++条件变量
- 在条件变量中触发错误信号的频率是多少
- 在通知提升间处理条件变量时未按住锁会导致问题
- 通知条件变量后使用互斥锁
- 滥用条件变量
- 升压插值条件变量可以虚假唤醒吗?
- 子线程中的条件变量等待停止主线程中的执行
- 条件变量基本示例
- 正在连接的等待条件变量的线程会发生什么情况?
- C++11如何在1个线程中使用条件变量处理2个线程安全队列
- 当线程处理不同的类时,应该在哪里声明条件变量、互斥对象
- 为什么在同一条件变量上使用多个互斥锁会使此代码崩溃?
- 条件变量:wait_for.gcc错误
- 如何"stop"正在等待条件变量的分离线程?
- 如何杀死被条件变量锁定的线程?
- 使用互斥锁和条件变量作为成员时如何修复"use of deleted function"?
- C++ 多个使用者线程卡在条件变量上
- POSIX 条件变量和互斥体"竞争"
- 将cpp_redis pub/sub与条件变量一起使用时出现问题