线程安全的队列关闭
C++ Thread safe queue shutdown
我在c++中使用这个类进行生产者-消费者设置:
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
template <typename T> class SafeQueue
{
public:
SafeQueue() :
_shutdown(false)
{
}
void Enqueue(T item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
bool was_empty = _queue.empty();
_queue.push(std::move(item));
lock.unlock();
if (was_empty)
_condition_variable.notify_one();
}
bool Dequeue(T& item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
if(!_shutdown)
{
item = std::move(_queue.front());
_queue.pop();
return true;
}
return false;
}
bool IsEmpty()
{
std::lock_guard<std::mutex> lock(_queue_mutex);
return _queue.empty();
}
void Shutdown()
{
_shutdown = true;
_condition_variable.notify_all();
}
private:
std::mutex _queue_mutex;
std::condition_variable _condition_variable;
std::queue<T> _queue;
std::atomic<bool> _shutdown;
};
我这样使用它:
class Producer
{
public:
Producer() :
_running(true),
_t(std::bind(&Producer::ProduceThread, this))
{ }
~Producer()
{
_running = false;
_incoming_packets.Shutdown();
_t.join();
}
SafeQueue<Packet> _incoming_packets;
private:
void ProduceThread()
{
while(_running)
{
Packet p = GetNewPacket();
_incoming_packets.Enqueue(p);
}
}
std::atomic<bool> _running;
std::thread _t;
}
class Consumer
{
Consumer(Producer* producer) :
_producer(producer),
_t(std::bind(&Consumer::WorkerThread, this))
{ }
~Consumer()
{
_t.join();
}
private:
void WorkerThread()
{
Packet p;
while(producer->_incoming_packets.Dequeue(p))
ProcessPacket(p);
}
std::thread _t;
Producer* _producer;
}
这在大多数情况下都有效。但偶尔当我删除生产者(并导致它的解构器调用SafeQueue::Shutdown
)时,_t.join()
就会永远阻塞。
我的猜测是,问题在这里(在SafeQueue::Dequeue
):
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
线程#1中的 SafeQueue::Shutdown
在线程#2完成检查_shutdown时被调用,但在执行_condition_variable.wait(lock)
之前,因此它"错过"notify_all()
。这种情况会发生吗?
如果这就是问题所在,最好的解决方法是什么?
由于SafeQueue对象归生产者所有,删除生产者将导致在被通知的消费者和被删除的SafeQueue之间产生竞争条件。
我建议共享资源既不属于生产者也不属于消费者,而是作为引用传递给各自的构造函数。
修改生产者和消费者构造函数;
Producer( SafeQueue<Packet> & queue ) :
_running(false), _incoming_packets(queue) {}
Consumer( SafeQueue<Packet> & queue ) :
_running(false), _incoming_packets(queue) {}
这样使用你的实例;
SafeQueue<Packet> queue;
Producer producer(queue);
Consumer consumer(queue);
...do stuff...
queue.shutdown();
这也解决了消费者类与生产者类紧密耦合的糟糕设计问题。
同样,在析构函数中终止并连接线程可能是个坏主意,就像在~Producer中所做的那样。最好为每个线程类添加一个Shutdown()方法,并显式调用它们;
producer.shutdown();
consumer.shutdown();
queue.shutdown();
关闭顺序并不重要,除非您担心在停止消费者时仍在队列中丢失未处理的数据包。
在您的SafeQueue::Dequeue
中,您可能以错误的方式使用std::condition_variable
…改变这个:
bool Dequeue(T& item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
if(!_shutdown)
{
item = std::move(_queue.front());
_queue.pop();
return true;
}
return false;
}
bool Dequeue(T& item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
_condition_variable.wait(lock, []{ return _shutdown || !_queue.empty() });
if(!_shutdown)
{
item = std::move(_queue.front());
_queue.pop();
return true;
}
return false;
}
其次,Consumer
中数据成员的初始化顺序对其构造函数
不正确。class Consumer
{
Consumer(Producer* producer) :
_producer(producer),
_t(std::bind(&Consumer::WorkerThread, this))
{ }
......
// _t will be constructed first, regardless of your constructor initializer list
// Meaning, the thread can even start running using an unintialized _producer
std::thread _t;
Producer* _producer;
}
应该重新排序为:
class Consumer
{
Consumer(Producer* producer) :
_producer(producer),
_t(std::bind(&Consumer::WorkerThread, this))
{ }
......
Producer* _producer;
std::thread _t;
}
CAB的回答涵盖了你问题的另一部分
相关文章:
- 在c++队列中使用pop和visit实现线程安全
- 共享队列的线程安全
- 线程安全队列 c++
- 线程安全的引用计数队列C++
- C++11如何在1个线程中使用条件变量处理2个线程安全队列
- 如何做 gtkmm 线程安全队列绘制?
- 线程安全队列出现分段错误
- Qt的事件循环线程是安全的还是原子的?处理"队列连接"时如何同步?
- IPC Unix 消息队列线程安全吗?
- boost消息队列线程安全和进程安全吗?
- 使用无锁指针队列在线程之间移动数据是否安全
- 线安全的队列和虚假的唤醒
- 另一个线程安全队列实现
- 生产者/使用者,如何确保在关闭所有使用者之前耗尽线程安全队列
- C++中的线程安全队列
- 寻找对我的线程安全,无锁队列实施的批评
- 具有固定大小的线程安全队列
- 使用本机windows API实现win32线程安全队列
- stl或boost中是否存在用于线程间通信的线程安全结构,其行为类似于队列
- 如果存在仅推送线程和仅弹出线程,那么C++std::队列安全吗