C++ 阻塞队列隔离错误带提升

C++ Blocking Queue Segfault w/ Boost

本文关键字:错误 隔离 队列 C++      更新时间:2023-10-16

我需要C++具有超时功能的阻塞队列offer()。该队列适用于多个生产者,一个使用者。当我实现时,我没有找到任何适合此需求的现有队列,所以我自己编写了代码。

我看到队列上的take()方法出现段错误,但它们是间歇性的。我一直在查看代码中的问题,但我没有看到任何看起来有问题的东西。

我想知道是否:

  • 有一个现有的库可以可靠地做到这一点,我应该使用(首选升压或仅标头)。
  • 任何人都会在我的代码中看到我需要修复的任何明显缺陷。

这是标题:

class BlockingQueue
{
    public:
        BlockingQueue(unsigned int capacity) : capacity(capacity) { };
        bool offer(const MyType & myType, unsigned int timeoutMillis);
        MyType take();
        void put(const MyType & myType);
        unsigned int getCapacity();
        unsigned int getCount();
    private:
         std::deque<MyType> queue;
         unsigned int capacity;
};

以及相关的实现:

boost::condition_variable cond;
boost::mutex mut;
bool BlockingQueue::offer(const MyType & myType, unsigned int timeoutMillis)
{
    Timer timer;
    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);
    // We use a while loop here because the monitor may have woken up because
    // another producer did a PulseAll. In that case, the queue may not have
    // room, so we need to re-check and re-wait if that is the case.
    // We use an external stopwatch to stop the madness if we have taken too long.
    while (queue.size() >= this->capacity)
    {
        int monitorTimeout = timeoutMillis - ((unsigned int) timer.getElapsedMilliSeconds());
        if (monitorTimeout <= 0)
        {
            return false;
        }
        if (!cond.timed_wait(lock, boost::posix_time::milliseconds(timeoutMillis)))
        {
            return false;
        }
    }
    cond.notify_all();
    queue.push_back(myType);
    return true;
}
void BlockingQueue::put(const MyType & myType)
{
    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);
    // We use a while loop here because the monitor may have woken up because
    // another producer did a PulseAll. In that case, the queue may not have
    // room, so we need to re-check and re-wait if that is the case.
    // We use an external stopwatch to stop the madness if we have taken too long.
    while (queue.size() >= this->capacity)
    {
        cond.wait(lock);
    }
    cond.notify_all();
    queue.push_back(myType);
}
MyType BlockingQueue::take()
{
    // boost::unique_lock is a scoped lock - its destructor will call unlock().
    // So no need for us to make that call here.
    boost::unique_lock<boost::mutex> lock(mut);
    while (queue.size() == 0)
    {
        cond.wait(lock);
    }
    cond.notify_one();
    MyType myType = this->queue.front();
    this->queue.pop_front();
    return myType;
}
unsigned int BlockingQueue::getCapacity()
{
    return this->capacity;
}
unsigned int BlockingQueue::getCount()
{
    return this->queue.size();
}

是的,我没有使用模板实现该类 - 这是列表中的下一个:)

任何帮助将不胜感激。线程问题可能真的很难确定。

-本

为什么 cond 和 mut 是全局变量?我希望他们是你的BlockingQueue对象的成员。我不知道还有什么东西在触及这些东西,但那里可能存在问题。

我也实现了ThreadSafeQueue作为更大项目的一部分:

https://github.com/cdesjardins/QueuePtr/blob/master/include/ThreadSafeQueue.h

它的概念与您的概念类似,除了排队(又名报价)功能是非阻塞的,因为基本上没有最大容量。为了强制实施容量,我通常会在系统初始化时添加一个带有 N 个缓冲区的池,并在运行时添加一个用于消息传递的队列,这也消除了在运行时分配内存的需要,我认为这是一件好事(我通常在嵌入式应用程序上工作)。

和队列之间的唯一区别是,池在系统初始化时获得一堆排队的缓冲区。所以你有这样的东西:

ThreadSafeQueue<BufferDataType*> pool;
ThreadSafeQueue<BufferDataType*> queue;
void init()
{
    for (int i = 0; i < NUM_BUFS; i++)
    {
        pool.enqueue(new BufferDataType);
    }
}

然后,当您想要发送消息时,您可以执行以下操作:

void producerA()
{
    BufferDataType *buf;
    if (pool.waitDequeue(buf, timeout) == true)
    {
        initBufWithMyData(buf);
        queue.enqueue(buf);
    }
}

这样,排队功能既快速又简单,但如果池为空,那么您将阻塞,直到有人将缓冲区放回池中。这个想法是,其他一些线程将在队列上阻塞,并在按如下方式处理缓冲区时将缓冲区返回到池中:

void consumer()
{
    BufferDataType *buf;
    if (queue.waitDequeue(buf, timeout) == true)
    {
        processBufferData(buf);
        pool.enqueue(buf);
    }
}
无论如何,看看

它,也许它会有所帮助。

我想你的代码中的问题是通过几个线程修改双端面。看:

  1. 您正在等待来自另一个线程的编码;
  2. 然后立即向其他线程发送信号,在您想要修改之前解锁 deque;
  3. 然后,当其他线程认为Deque已解锁并开始执行相同的操作时,您修改了Deque。

因此,尝试在修改双端面后放置所有cond.notify_*()。 即:

void BlockingQueue::put(const MyType & myType)
{
    boost::unique_lock<boost::mutex> lock(mut);
    while (queue.size() >= this->capacity)
    {
        cond.wait(lock);
    }
    queue.push_back(myType);  // <- modify first
    cond.notify_all();        // <- then say to others that deque is free
}

为了更好地理解,我建议阅读有关pthread_cond_wait()的信息。