C++ 阻塞队列隔离错误带提升
C++ Blocking Queue Segfault w/ Boost
我需要C++具有超时功能的阻塞队列offer()
。该队列适用于多个生产者,一个使用者。当我实现时,我没有找到任何适合此需求的现有队列,所以我自己编写了代码。
我看到队列上的take()
方法出现段错误,但它们是间歇性的。我一直在查看代码中的问题,但我没有看到任何看起来有问题的东西。
我想知道是否:
- 有一个现有的库可以可靠地做到这一点,我应该使用(首选升压或仅标头)。
- 任何人都会在我的代码中看到我需要修复的任何明显缺陷。
这是标题:
class BlockingQueue
{
public:
BlockingQueue(unsigned int capacity) : capacity(capacity) { };
bool offer(const MyType & myType, unsigned int timeoutMillis);
MyType take();
void put(const MyType & myType);
unsigned int getCapacity();
unsigned int getCount();
private:
std::deque<MyType> queue;
unsigned int capacity;
};
以及相关的实现:
boost::condition_variable cond;
boost::mutex mut;
bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis)
{
Timer timer;
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
// We use a while loop here because the monitor may have woken up because
// another producer did a PulseAll. In that case, the queue may not have
// room, so we need to re-check and re-wait if that is the case.
// We use an external stopwatch to stop the madness if we have taken too long.
while (queue.size() >= this->capacity)
{
int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds());
if (monitorTimeout <= 0)
{
return false;
}
if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis)))
{
return false;
}
}
cond.notify_all();
queue.push_back(myType);
return true;
}
void BlockingQueue::put(const MyType & myType)
{
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
// We use a while loop here because the monitor may have woken up because
// another producer did a PulseAll. In that case, the queue may not have
// room, so we need to re-check and re-wait if that is the case.
// We use an external stopwatch to stop the madness if we have taken too long.
while (queue.size() >= this->capacity)
{
cond.wait(lock);
}
cond.notify_all();
queue.push_back(myType);
}
MyType BlockingQueue::take()
{
// boost::unique_lock is a scoped lock - its destructor will call unlock().
// So no need for us to make that call here.
boost::unique_lock<boost::mutex> lock(mut);
while (queue.size() == 0)
{
cond.wait(lock);
}
cond.notify_one();
MyType myType = this->queue.front();
this->queue.pop_front();
return myType;
}
unsigned int BlockingQueue::getCapacity()
{
return this->capacity;
}
unsigned int BlockingQueue::getCount()
{
return this->queue.size();
}
是的,我没有使用模板实现该类 - 这是列表中的下一个:)
任何帮助将不胜感激。线程问题可能真的很难确定。
-本
为什么 cond 和 mut 是全局变量?我希望他们是你的BlockingQueue对象的成员。我不知道还有什么东西在触及这些东西,但那里可能存在问题。
我也实现了ThreadSafeQueue作为更大项目的一部分:
https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h
它的概念与您的概念类似,除了排队(又名报价)功能是非阻塞的,因为基本上没有最大容量。为了强制实施容量,我通常会在系统初始化时添加一个带有 N 个缓冲区的池,并在运行时添加一个用于消息传递的队列,这也消除了在运行时分配内存的需要,我认为这是一件好事(我通常在嵌入式应用程序上工作)。
池和队列之间的唯一区别是,池在系统初始化时获得一堆排队的缓冲区。所以你有这样的东西:
ThreadSafeQueue<BufferDataType*> pool;
ThreadSafeQueue<BufferDataType*> queue;
void init()
{
for (int i = 0; i < NUM_BUFS; i++)
{
pool.enqueue(new BufferDataType);
}
}
然后,当您想要发送消息时,您可以执行以下操作:
void producerA()
{
BufferDataType *buf;
if (pool.waitDequeue(buf, timeout) == true)
{
initBufWithMyData(buf);
queue.enqueue(buf);
}
}
这样,排队功能既快速又简单,但如果池为空,那么您将阻塞,直到有人将缓冲区放回池中。这个想法是,其他一些线程将在队列上阻塞,并在按如下方式处理缓冲区时将缓冲区返回到池中:
void consumer()
{
BufferDataType *buf;
if (queue.waitDequeue(buf, timeout) == true)
{
processBufferData(buf);
pool.enqueue(buf);
}
}
无论如何,看看它,也许它会有所帮助。
我想你的代码中的问题是通过几个线程修改双端面。看:
- 您正在等待来自另一个线程的编码;
- 然后立即向其他线程发送信号,在您想要修改之前解锁 deque;
- 然后,当其他线程认为Deque已解锁并开始执行相同的操作时,您修改了Deque。
因此,尝试在修改双端面后放置所有cond.notify_*()
。 即:
void BlockingQueue::put(const MyType & myType)
{
boost::unique_lock<boost::mutex> lock(mut);
while (queue.size() >= this->capacity)
{
cond.wait(lock);
}
queue.push_back(myType); // <- modify first
cond.notify_all(); // <- then say to others that deque is free
}
为了更好地理解,我建议阅读有关pthread_cond_wait()
的信息。
- 从矢量中删除元素后出现隔离错误
- 在 gtest 中初始化堆栈上的引用变量的隔离错误
- 线程时访问静态映射时出现隔离错误
- 并行快速排序分区中的隔离错误
- C++多线程程序:变量定义为类成员的隔离错误
- TFLite 隔离错误,通过获取C++输入和输出
- 我只是在寻找模板,在我的书中找到了这段代码,这显示了隔离错误?
- pthread_create中错误 4 的隔离错误
- 递归树遍历/分支删除的隔离错误
- 在类模板上使用 arm gcc 编译期间的隔离错误
- 从大量文件读取时出现隔离错误
- 在PHP扩展中使用emalloc从线程时出现隔离错误
- 通过 Boost Python 在C++对象之间传递共享指针的隔离错误
- 在QT中单击菜单时出现隔离错误
- 尝试访问标头声明成员时出现隔离错误
- 搜索链表时出现隔离错误
- 模板化子类析构函数中的隔离错误
- 插件中节点.js/Nan 回调C++不频繁的隔离错误
- 将行添加到 GTKTreeView 时的隔离错误
- 找不到命令时打开的隔离错误