单个生产者多个消费者:如何告诉消费者生产已经完成
Single producer-multiple consumers: How to tell consumers that production is complete
在我的程序中,一个生产者线程从文本文件中读取文本行(大约有8000行文本(,并将这些行加载到并发队列。
三个使用者线程读取队列中的行,每个线程写入一个单独的文件。
当我运行程序时,只有生产者线程和其中一个消费者线程完成。其他两个线程似乎挂断了。
如何可靠地告诉所有使用者线程已经到达文件末尾,以便它们返回但要确保队列完全为空。
我的平台是Windows7 64位
VC11.
编译为64位和32位的代码得到了相同的行为。
这是代码。(它是独立的和可编译的(
#include <queue>
#include<iostream>
#include<fstream>
#include <atomic>
#include <thread>
#include <condition_variable>
#include <mutex>
#include<string>
#include<memory>
template<typename Data>
class concurrent_queue
{
private:
std::queue<Data> the_queue;
mutable std::mutex the_mutex;
std::condition_variable the_condition_variable;
public:
void push(Data const& data){
{
std::lock_guard<std::mutex> lock(the_mutex);
the_queue.push(data);
}
the_condition_variable.notify_one();
}
bool empty() const{
std::unique_lock<std::mutex> lock(the_mutex);
return the_queue.empty();
}
const size_t size() const{
std::lock_guard<std::mutex> lock(the_mutex);
return the_queue.size();
}
bool try_pop(Data& popped_value){
std::unique_lock<std::mutex> lock(the_mutex);
if(the_queue.empty()){
return false;
}
popped_value=the_queue.front();
the_queue.pop();
return true;
}
void wait_and_pop(Data& popped_value){
std::unique_lock<std::mutex> lock(the_mutex);
while(the_queue.empty()){
the_condition_variable.wait(lock);
}
popped_value=the_queue.front();
the_queue.pop();
}
};
std::atomic<bool> done(true);
typedef std::vector<std::string> segment;
concurrent_queue<segment> data;
const int one_block = 15;
void producer()
{
done.store(false);
std::ifstream inFile("c:/sample.txt");
if(!inFile.is_open()){
std::cout << "Can't read from filen";
return;
}
std::string line;
segment seg;
int cnt = 0;
while(std::getline(inFile,line)){
seg.push_back(line);
++cnt;
if( cnt == one_block ){
data.push( seg );
seg.clear();
cnt = 0;
}
}
inFile.close();
done.store(true);
std::cout << "all donen";
}
void consumer( std::string fname)
{
std::ofstream outFile(fname.c_str());
if(!outFile.is_open()){
std::cout << "Can't write to filen";
return;
}
do{
while(!data.empty()){
segment seg;
data.wait_and_pop( seg );
for(size_t i = 0; i < seg.size(); ++i)
{
outFile << seg[i] << std::endl;
}
outFile.flush();
}
} while(!done.load());
outFile.close();
std::cout << fname << " done.n";
}
int main()
{
std::thread th0(producer);
std::thread th1(consumer, "Worker1.txt");
std::thread th2(consumer, "Worker2.txt");
std::thread th3(consumer, "Worker3.txt");
th0.join();
th1.join();
th2.join();
th3.join();
return 0;
}
我用来终止所有在队列上等待的线程的方法是在队列上设置一个标志,说明是否已完成,在检查pop()
函数中是否有元素之前进行测试。如果标志指示程序应该停止,那么如果队列中没有元素,则任何调用pop()
的线程都会抛出异常。当标志被更改时,更改线程只对相应的条件变量调用notify_all()
。
查看以下代码:
while(!data.empty()){
segment seg;
data.wait_and_pop( seg );
...
考虑一种情况,即要读取最后一段数据。消费者th1
&CCD_ 5正在等待数据被读取。
消费者th1
检查!data.empty()
,发现有要读取的数据。然后,在th1
调用data.wait_and_pop()
之前,消费者th2
检查!data.empty()
并发现它为真。假设消费者th1
消费了最后一个片段。现在,由于没有要读取的段,th2
在data.wait_and_pop()
中的the_queue.empty()
上无限期地等待。
试试这个代码而不是上面的代码片段:
segment seg;
while(data.try_pop(seg)){
...
应该让它发挥作用。
您可能想要在concurrent_queue
中添加一个布尔标志。在读取文件后设置它(在互斥对象下(。读取和文件后,队列为空,使用notify_all
从清空队列的使用者广播条件变量。
这将唤醒所有其他消费者,他们需要发现最终条件(标志集和队列为空(并退出循环。为了避免比赛条件,这意味着他们在等待之前需要检查相同的组合条件。
现有标志的问题是,永远不会从等待condvar中醒来的线程永远不会检查它。"finished"标志需要是他们正在等待的状态的一部分。
[编辑:Dietmar对标志的含义略有不同,这可能会导致代码更简单,但我并没有把它们都写出来进行比较。]
- 消费者和生产者问题的双重缓冲
- 如何降低生产者获得锁的可能性,而消费者在使用std::condition_variable时无法获得锁?
- C++ deque 消费者总是从生产者那里得到空队列
- C++多线程生产者-消费者问题
- 生产者和消费者优化
- 特定的生产者-消费者方案
- 为什么condition_variable在等待生产者-消费者的锁定?C++
- 将生产者/消费者与障碍同步
- 单个生产者/多个消费者死锁
- 带有共享缓冲区的两个等待线程(生产者/消费者)
- C++简单的消费者生产者问题
- 生产者/消费者,消费者线程从未执行过
- 在使用 pthread 和信号量实现生产者-消费者问题时需要帮助
- 如何在 c++ 中使用 winapi 事件解决生产者-消费者
- C++生产者消费者陷入僵局
- 简单生产者和消费者的unique_lock困难
- 生产者消费者实现中的条件变量应如何初始化
- 如何实现消费者生产者,消费者可以请求新的数据
- c++中的消费者/生产者
- 如何同步线程(消费者/生产者)