优化生产者-消费者线程之间的交互

Optimizing interaction between Producer - Consumer threads

本文关键字:之间 交互 线程 消费者 生产者 优化      更新时间:2023-10-16

我有一个生产者-消费者多线程交互的实现。它的工作原理。但是我觉得在执行过程中,等待状态经常发生在消费者线程和生产者线程之间。在我的例子中,Consumer以随机间隔访问队列并从中获取数据。现在,生产者线程在整个进程生命周期中运行。生产者线程充当缓存机。它在循环中检查队列的大小是否小于允许的最大缓存大小,如果是这种情况,它会继续将新数据推送到缓存中。我担心的是,当消费者试图访问队列时,最后一个仍然被生产者线程锁定,消费者应该等待。理想情况下,我希望让消费者导致生产者"冻结"并立即解锁,而消费者从队列中检索数据。

我现在是这样做的:

   //Called by consumer(main thead) to retrieve data from cache
   uint8_t* Worker::GetFrame() {
      boost::unique_lock<boost::mutex> lk(_frameCacheMutex);
      //If the cache is empty:
      while (0 == _frames_cache.size()) {
               //tell Producer to push data into cache
             _fill_cache_wait_cond.notify_one();
              //wait for the data to arrive(will be signaled by worker thread)
             _get_frame_wait_cond.wait(lk);
      }
      uint8_t * fr = _frames_cache.front();
      _frames_cache.pop();
      // notify worker thread to continue caching
      _fill_cache_wait_cond.notify_one();
      return fr;
   }

生产者线程:

    void Worker::operator () () {
      //Some init here..
        while (isRunning) {
           boost::unique_lock<boost::mutex> lk(_frameCacheMutex);
           /// Here create the data for cache...
           _frames_cache.push(data);
           /// Notify waiting main thread to take data
           _get_frame_wait_cond.notify_one();
           /// If the cache is full ,wait
           while (_frames_cache.size() == cacheMaxSize ){
                 _fill_cache_wait_cond.wait(lk);
           }
        }     
    }

此时,生产者锁定队列,直到队列满。只有在那个时候,消费者才能访问队列,但它会立即发出信号,表明队列已满,因此生产者再次锁定队列。

至少只在数据准备好被推送后才锁,并尝试限制推送动作。
如果可能的话,你还可以增加消费者的优先级。

顺便说一下,这不会立即解决你的问题,但你可以限制notify_one()调用的数量,因为你只需要在条件实际改变时发送一个(对消费者不再为零,对生产者不再为满)。

std::atomic<bool> producerShouldSleep = false;
std::atomic<bool> producerIsSleeping = false;
Item consumeItem(){ //consumer
    producerShouldSleep = true;
    while (!producerIsSleeping)
        yield();
    auto item = getItemFromQueue();
    producerShouldSleep = false;
    return item;
}
void produceData(){ //producer
    while (shouldBeRunning){
        if (producerShouldSleep){
            producerIsSleeping = true;
            while (producerShouldSleep)
                yield();
            producerIsSleeping = false;
            continue;
        }
        if (!queueIsFull())
            pushItemIntoQueue();
    }
}

这将优先考虑消费者。除非我搞砸了什么,否则它会正确同步。我看到的唯一问题是队列可能是空的,并且有人在紧循环中调用consumeItem,这可能会阻止生产者将项目推入队列,从而最终导致lifelock。