资源有限的生产者-工人-消费者

Producer-worker-consumer with limited resource

本文关键字:工人 消费者 生产者 资源      更新时间:2023-10-16

我想对通常的生产者-消费者问题进行轻微修改,使用有限资源的中间'worker'线程。一个示例应用程序可以是:

    生产者线程从文件中读取记录并将它们放入队列。一旦到达文件结束,应该向工作线程发送一个通知。一个或多个worker线程从生产者队列中提取记录,进行某种处理,并将处理后的记录推送到另一个队列。一旦所有记录都被处理完,就会向消费者线程发送通知。
  • 单个消费者线程将处理过的记录写入文件。

我并不是说这是解决这种问题的好方法,但它突出了我试图解决的问题,即如何正确地通知工作线程和消费者线程。

我有一个线程安全队列,具有以下接口:

template<class T>
class threadsafe_queue
{
public:
    threadsafe_queue() {}
    threadsafe_queue(const threadsafe_queue& other);
    void push(T new_value);
    void wait_and_pop(T& value);
    std::shared_ptr<T> wait_and_pop();
    bool try_pop(T& value);
    std::shared_ptr<T> try_pop();
    bool empty() const;
};
我解决单个工作线程问题的第一个想法是使用两个原子池,如下所示:
#include <chrono>
#include <thread>
void queue_producer(threadsafe_queue<unsigned>& queue, std::atomic<bool>& producer_finished)
{
    for (unsigned i = 0; i < 10; ++i) {
        queue.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    producer_finished.store(true);
    std::cout << "Producer finished." << std::endl;
}
void queue_processor(threadsafe_queue<unsigned>& in_queue, threadsafe_queue<unsigned>& out_queue,
                 std::atomic<bool>& producer_finished, std::atomic<bool>& processor_finished)
{
    unsigned value;
    while (!producer_finished.load()) {
        in_queue.wait_and_pop(value);
        value *= 10;
        out_queue.push(value);
    }
    processor_finished.store(true);
    std::cout << "Processor finished." << std::endl;
}
void queue_consumer(threadsafe_queue<unsigned>& queue, std::atomic<bool>& processor_finished)
{
    unsigned value;
    while (!processor_finished.load()) {
        queue.wait_and_pop(value);
        std::cout << "Received value " << value << "." << std::endl; // Or write to file etc.
    }
    std::cout << "Consumer finished." << std::endl;
}
int main(int argc, const char * argv[])
{
    std::atomic<bool> producer_finished(false);
    std::atomic<bool> processor_finished(false);
    threadsafe_queue<unsigned> in_queue;
    threadsafe_queue<unsigned> out_queue;
    std::thread producer_thread(queue_producer, std::ref(in_queue), std::ref(producer_finished));
    std::thread processor_thread(queue_processor, std::ref(in_queue), std::ref(out_queue), std::ref(producer_finished), std::ref(processor_finished));
    std::thread consumer_thread(queue_consumer, std::ref(out_queue), std::ref(processor_finished));
    producer_thread.join();
    processor_thread.join();
    consumer_thread.join();
    return 0;
}

这样做的问题是,处理器(和消费者)可以在原子bool值设置之前重新进入while循环,从而无限期地等待永远不会出现的记录。

我还认为一种解决方案可能是将某种哨兵值推到队列上以表示结束(可以使用包装器类),但这似乎不是一种特别好的做事方式,而且它不适用于多工作者版本。我实际上认为多工作者版本是一个更困难的问题,所以任何帮助与单工作者版本将是一个很好的开始。

邮箱-解决生产者/消费者问题

你可以使用一个叫做Message Box(又名Mailbox)的线程同步原语将生产者和消费者线程连接在一起。邮箱可以使用c++ 11互斥锁和条件变量来实现。

邮箱为两个线程提供了交换信息的方法。通常,一个线程将生成消息并将其发送到另一个线程进行处理。消息包含在邮箱中,因此是线程安全的。

mbox.Put(message);   

尝试将邮件放入指定的邮箱中。如果邮箱已满,则呼叫可能会阻塞或不阻塞,这取决于设计。

消费者

message = mbox.Get();   // Blocking

当指定邮箱可用时,从该邮箱中删除邮件,并返回该邮件的地址或移动副本。

Put和Get方法的代码片段:

/// <summary>Push an item to the back of the queue. Move.</summary>
void Put(T && itemToAddByMoving)
{
    {
        std::lock_guard<std::mutex> lg(m_queueMutex);
        m_deque.push_back(std::forward<T>(itemToAddByMoving));   // Perfect forwarding: rvalue if argument is rvalue
    }   // release lock
    m_queueCondVar.notify_one();    // Notify consumer thread that there is data available in the queue.
}
/// <summary>Waiting for the queue to have data ready (not empty). Pop the first element when ready.</summary>
/// <returns>First element in front of the queue.</returns>
T Get()
{
    T poppedValue;      // T should support move.
    {
        std::unique_lock<std::mutex> ul(m_queueMutex);
        m_queueCondVar.wait(ul, [&]{ return !m_deque.empty(); });   // Wait here. Blocking.
        poppedValue = m_deque.front();
        m_deque.pop_front();
    }   // release lock
    return poppedValue;
}

您可以将通知原则与<mutex>结合起来考虑<condition_variable>

你需要一个地方:

std::mutex mtx;                     // for locking  
std::condition_variable ready;      // for waiting conditions 

在生产者端,处理输入并保护队列的更新:

{
        std::lock_guard<std::mutex> guard(mtx);     // lock
        // ... updated the queue
        ready.notify_all();                 // notify consumers
}                                           // the lock is removed here 

在消费者端,你有一个循环:

{
    std::unique_lock<std::mutex> guard(mtx);        // lock 
                                                    // wait condition
    ready.wait(guard, [&]() {return /* queue not empty */; });  // lock is released as long as condition is false 
    //... here lock is set and condition is true: read element from queue
    //... if processing is long release the lock when queue is read
}    // otherwhise the lock is removed here

你可以在这里找到一个带有条件变量生产者/消费者示例的教程。注意notify_one()notify_all();我目前正在进行试验,据我所知,第一种方法最适合制作人/非专业消费者。如果每个消费者都是专门的,即必须找出他是否能够处理输入,则后者将是合适的。

其他方法

使用当前代码。我得到"Received"行显示从10到90,显示所有线程都有贡献,但随后程序挂起。

如果我用this_thread::yield()代替等待,以避免不必要的等待,所有三个线程立即声明它们已经完成了它们的工作。为什么?因为生产者运行得更快,当处理器有机会工作时,producer_finished为真。处理器甚至不运行循环。

所以你的代码很大程度上依赖于执行的顺序。请记住,您的队列是线程安全的,您可以通过修改处理器和消费者的while条件来改善这种情况,如下所示:
while (!producer_finished.load() || !in_queue.empty()) {
...
while (!processor_finished.load() || !queue.empty()) {
...

除了上面的conditional_variable方法之外,另一种方法可以使用std::promise<T>(例如:处理器发送通知)和std::shared_future<T>(例如:消费者发现他有东西要处理)