资源有限的生产者-工人-消费者
Producer-worker-consumer with limited resource
我想对通常的生产者-消费者问题进行轻微修改,使用有限资源的中间'worker'线程。一个示例应用程序可以是:
- 生产者线程从文件中读取记录并将它们放入队列。一旦到达文件结束,应该向工作线程发送一个通知。一个或多个worker线程从生产者队列中提取记录,进行某种处理,并将处理后的记录推送到另一个队列。一旦所有记录都被处理完,就会向消费者线程发送通知。
- 单个消费者线程将处理过的记录写入文件。
我并不是说这是解决这种问题的好方法,但它突出了我试图解决的问题,即如何正确地通知工作线程和消费者线程。
我有一个线程安全队列,具有以下接口:
template<class T>
class threadsafe_queue
{
public:
threadsafe_queue() {}
threadsafe_queue(const threadsafe_queue& other);
void push(T new_value);
void wait_and_pop(T& value);
std::shared_ptr<T> wait_and_pop();
bool try_pop(T& value);
std::shared_ptr<T> try_pop();
bool empty() const;
};
我解决单个工作线程问题的第一个想法是使用两个原子池,如下所示:
#include <chrono>
#include <thread>
void queue_producer(threadsafe_queue<unsigned>& queue, std::atomic<bool>& producer_finished)
{
for (unsigned i = 0; i < 10; ++i) {
queue.push(i);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
producer_finished.store(true);
std::cout << "Producer finished." << std::endl;
}
void queue_processor(threadsafe_queue<unsigned>& in_queue, threadsafe_queue<unsigned>& out_queue,
std::atomic<bool>& producer_finished, std::atomic<bool>& processor_finished)
{
unsigned value;
while (!producer_finished.load()) {
in_queue.wait_and_pop(value);
value *= 10;
out_queue.push(value);
}
processor_finished.store(true);
std::cout << "Processor finished." << std::endl;
}
void queue_consumer(threadsafe_queue<unsigned>& queue, std::atomic<bool>& processor_finished)
{
unsigned value;
while (!processor_finished.load()) {
queue.wait_and_pop(value);
std::cout << "Received value " << value << "." << std::endl; // Or write to file etc.
}
std::cout << "Consumer finished." << std::endl;
}
int main(int argc, const char * argv[])
{
std::atomic<bool> producer_finished(false);
std::atomic<bool> processor_finished(false);
threadsafe_queue<unsigned> in_queue;
threadsafe_queue<unsigned> out_queue;
std::thread producer_thread(queue_producer, std::ref(in_queue), std::ref(producer_finished));
std::thread processor_thread(queue_processor, std::ref(in_queue), std::ref(out_queue), std::ref(producer_finished), std::ref(processor_finished));
std::thread consumer_thread(queue_consumer, std::ref(out_queue), std::ref(processor_finished));
producer_thread.join();
processor_thread.join();
consumer_thread.join();
return 0;
}
这样做的问题是,处理器(和消费者)可以在原子bool值设置之前重新进入while循环,从而无限期地等待永远不会出现的记录。
我还认为一种解决方案可能是将某种哨兵值推到队列上以表示结束(可以使用包装器类),但这似乎不是一种特别好的做事方式,而且它不适用于多工作者版本。我实际上认为多工作者版本是一个更困难的问题,所以任何帮助与单工作者版本将是一个很好的开始。
邮箱-解决生产者/消费者问题
你可以使用一个叫做Message Box(又名Mailbox)的线程同步原语将生产者和消费者线程连接在一起。邮箱可以使用c++ 11互斥锁和条件变量来实现。
邮箱为两个线程提供了交换信息的方法。通常,一个线程将生成消息并将其发送到另一个线程进行处理。消息包含在邮箱中,因此是线程安全的。
mbox.Put(message);
尝试将邮件放入指定的邮箱中。如果邮箱已满,则呼叫可能会阻塞或不阻塞,这取决于设计。
消费者message = mbox.Get(); // Blocking
当指定邮箱可用时,从该邮箱中删除邮件,并返回该邮件的地址或移动副本。
Put和Get方法的代码片段:
/// <summary>Push an item to the back of the queue. Move.</summary>
void Put(T && itemToAddByMoving)
{
{
std::lock_guard<std::mutex> lg(m_queueMutex);
m_deque.push_back(std::forward<T>(itemToAddByMoving)); // Perfect forwarding: rvalue if argument is rvalue
} // release lock
m_queueCondVar.notify_one(); // Notify consumer thread that there is data available in the queue.
}
/// <summary>Waiting for the queue to have data ready (not empty). Pop the first element when ready.</summary>
/// <returns>First element in front of the queue.</returns>
T Get()
{
T poppedValue; // T should support move.
{
std::unique_lock<std::mutex> ul(m_queueMutex);
m_queueCondVar.wait(ul, [&]{ return !m_deque.empty(); }); // Wait here. Blocking.
poppedValue = m_deque.front();
m_deque.pop_front();
} // release lock
return poppedValue;
}
您可以将通知原则与<mutex>
结合起来考虑<condition_variable>
。
你需要一个地方:
std::mutex mtx; // for locking
std::condition_variable ready; // for waiting conditions
在生产者端,处理输入并保护队列的更新:
{
std::lock_guard<std::mutex> guard(mtx); // lock
// ... updated the queue
ready.notify_all(); // notify consumers
} // the lock is removed here
在消费者端,你有一个循环:
{
std::unique_lock<std::mutex> guard(mtx); // lock
// wait condition
ready.wait(guard, [&]() {return /* queue not empty */; }); // lock is released as long as condition is false
//... here lock is set and condition is true: read element from queue
//... if processing is long release the lock when queue is read
} // otherwhise the lock is removed here
你可以在这里找到一个带有条件变量生产者/消费者示例的教程。注意notify_one()
和notify_all()
;我目前正在进行试验,据我所知,第一种方法最适合制作人/非专业消费者。如果每个消费者都是专门的,即必须找出他是否能够处理输入,则后者将是合适的。
其他方法
使用当前代码。我得到"Received"行显示从10到90,显示所有线程都有贡献,但随后程序挂起。
如果我用this_thread::yield()
代替等待,以避免不必要的等待,所有三个线程立即声明它们已经完成了它们的工作。为什么?因为生产者运行得更快,当处理器有机会工作时,producer_finished
为真。处理器甚至不运行循环。
while (!producer_finished.load() || !in_queue.empty()) {
...
while (!processor_finished.load() || !queue.empty()) {
...
除了上面的conditional_variable方法之外,另一种方法可以使用std::promise<T>
(例如:处理器发送通知)和std::shared_future<T>
(例如:消费者发现他有东西要处理)
- 消费者和生产者问题的双重缓冲
- 如何降低生产者获得锁的可能性,而消费者在使用std::condition_variable时无法获得锁?
- C++ deque 消费者总是从生产者那里得到空队列
- 在工人类中使用不同类的静态函数进行实验
- C++多线程生产者-消费者问题
- 生产者和消费者优化
- 如何查看Tibco EMS独享队列是否有活跃消费者?
- 特定的生产者-消费者方案
- 为什么condition_variable在等待生产者-消费者的锁定?C++
- 卡夫卡消费者投票最新消息
- 将生产者/消费者与障碍同步
- 任何用于PostgreSQL的OLE DB提供商/消费者
- 单个生产者/多个消费者死锁
- 带有共享缓冲区的两个等待线程(生产者/消费者)
- C 中Google Pub/sub的消费者示例
- C++简单的消费者生产者问题
- 生产者/消费者,消费者线程从未执行过
- 针对 C++ API 的消费者驱动的合同测试
- 在使用 pthread 和信号量实现生产者-消费者问题时需要帮助
- 资源有限的生产者-工人-消费者