线程安全的队列关闭

C++ Thread safe queue shutdown

本文关键字:队列 安全 线程      更新时间:2023-10-16

我在c++中使用这个类进行生产者-消费者设置:

#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
template <typename T> class SafeQueue
{
public:
    SafeQueue() :
    _shutdown(false)
    {
    }
    void Enqueue(T item)
    {
        std::unique_lock<std::mutex> lock(_queue_mutex);
        bool was_empty = _queue.empty();
        _queue.push(std::move(item));
        lock.unlock();
        if (was_empty)
            _condition_variable.notify_one();
    }
    bool Dequeue(T& item)
    {
        std::unique_lock<std::mutex> lock(_queue_mutex);
        while (!_shutdown && _queue.empty())
            _condition_variable.wait(lock);
        if(!_shutdown)
        {
            item = std::move(_queue.front());
            _queue.pop();
            return true;
        }
        return false;
    }
    bool IsEmpty()
    {
        std::lock_guard<std::mutex> lock(_queue_mutex);
        return _queue.empty();
    }
    void Shutdown()
    {
        _shutdown = true;
        _condition_variable.notify_all();
    }
private:
    std::mutex _queue_mutex;
    std::condition_variable _condition_variable;
    std::queue<T> _queue;
    std::atomic<bool> _shutdown;
};

我这样使用它:

class Producer
{
public:
    Producer() :
        _running(true),
        _t(std::bind(&Producer::ProduceThread, this))
    { }
    ~Producer()
    {
        _running = false;
        _incoming_packets.Shutdown();
        _t.join();
    }
    SafeQueue<Packet> _incoming_packets;
private:
    void ProduceThread()
    {
        while(_running)
        {
            Packet p = GetNewPacket();
            _incoming_packets.Enqueue(p);
        }
    }
    std::atomic<bool> _running;
    std::thread _t;
}
class Consumer
{
    Consumer(Producer* producer) :
        _producer(producer),
        _t(std::bind(&Consumer::WorkerThread, this))
    { }
    ~Consumer()
    {
        _t.join();
    }
private:
    void WorkerThread()
    {
        Packet p;
        while(producer->_incoming_packets.Dequeue(p))
            ProcessPacket(p);
    }
    std::thread _t;
    Producer* _producer;
}

这在大多数情况下都有效。但偶尔当我删除生产者(并导致它的解构器调用SafeQueue::Shutdown)时,_t.join()就会永远阻塞。

我的猜测是,问题在这里(在SafeQueue::Dequeue):

while (!_shutdown && _queue.empty())
        _condition_variable.wait(lock);
线程#1中的

SafeQueue::Shutdown在线程#2完成检查_shutdown时被调用,但在执行_condition_variable.wait(lock)之前,因此它"错过"notify_all()。这种情况会发生吗?

如果这就是问题所在,最好的解决方法是什么?

由于SafeQueue对象归生产者所有,删除生产者将导致在被通知的消费者和被删除的SafeQueue之间产生竞争条件。

我建议共享资源既不属于生产者也不属于消费者,而是作为引用传递给各自的构造函数。

修改生产者和消费者构造函数;

Producer( SafeQueue<Packet> & queue ) :
    _running(false), _incoming_packets(queue) {}

Consumer( SafeQueue<Packet> & queue ) :
    _running(false), _incoming_packets(queue) {}

这样使用你的实例;

SafeQueue<Packet> queue;
Producer producer(queue);  
Consumer consumer(queue);
...do stuff...
queue.shutdown();

这也解决了消费者类与生产者类紧密耦合的糟糕设计问题。

同样,在析构函数中终止并连接线程可能是个坏主意,就像在~Producer中所做的那样。最好为每个线程类添加一个Shutdown()方法,并显式调用它们;

producer.shutdown();
consumer.shutdown();
queue.shutdown();

关闭顺序并不重要,除非您担心在停止消费者时仍在队列中丢失未处理的数据包。

在您的SafeQueue::Dequeue中,您可能以错误的方式使用std::condition_variable…改变这个:

bool Dequeue(T& item)
{
    std::unique_lock<std::mutex> lock(_queue_mutex);
    while (!_shutdown && _queue.empty())
        _condition_variable.wait(lock);
    if(!_shutdown)
    {
        item = std::move(_queue.front());
        _queue.pop();
        return true;
    }
    return false;
}

bool Dequeue(T& item)
{
    std::unique_lock<std::mutex> lock(_queue_mutex);
   _condition_variable.wait(lock, []{ return _shutdown || !_queue.empty() });
    if(!_shutdown)
    {
        item = std::move(_queue.front());
        _queue.pop();
        return true;
    }
    return false;
}
其次,Consumer中数据成员的初始化顺序对其构造函数 不正确。
class Consumer
{
    Consumer(Producer* producer) :
        _producer(producer),
        _t(std::bind(&Consumer::WorkerThread, this))
    { }
    ......
    // _t will be constructed first, regardless of your constructor initializer list
    // Meaning, the thread can even start running using an unintialized _producer
    std::thread _t;      
    Producer* _producer;
}

应该重新排序为:

class Consumer
{
    Consumer(Producer* producer) :
        _producer(producer),
        _t(std::bind(&Consumer::WorkerThread, this))
    { }
    ......
    Producer* _producer;
    std::thread _t;      
}

CAB的回答涵盖了你问题的另一部分