公平队列丢失通知

Fair queue loses notifications

本文关键字:通知 队列      更新时间:2023-10-16

考虑以下代码

#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class Tqueue 
{
public:
   Tqueue() : m_next_ticket(0),
              m_counter(0) {}
   void push(const T& e){
       std::unique_lock<std::mutex> lock(m_mutex);
       m_queue.push(e);
       lock.unlock();
       m_cond.notify_all();
    };
   T wait_and_pop() {
       std::unique_lock<std::mutex> lock(m_mutex);
       int ticket = m_next_ticket++;
       m_cond.wait(lock,[=]{return (!m_queue.empty())
                  && (ticket == m_counter);});
       m_counter++;
       T data = m_queue.front();
       m_queue.pop();
       return data;
   }
private:
   int m_next_ticket;
   int m_counter;
   std::queue<T> m_queue;
   std::mutex m_mutex;
   std::condition_variable m_cond;   
};

这应该是我提出的公平队列的模板。在此上下文中,公平意味着wait_and_pop()调用以不同线程调用的顺序返回。

例如

:线程1在空队列上调用wait_and_pop()并阻塞。然后,线程2在空队列上调用wait_and_pop()并阻塞。之后,线程3用push()推两个事件。现在线程1应该先于线程2返回。

使用下面的代码,它不时地工作。但大多数时候代码永远阻塞:

Tqueue<int> queue;
std::mutex mutex;
void test(int i) 
{
    auto bla = queue.wait_and_pop();
    std::cout << "Thread : "<<bla << std::endl;
}
const int SIZE = 200;
int main(int argc, char *argv[])
{
    std::vector<std::thread> threads;
    for(int i = 0; i < SIZE; ++i)
       threads.push_back(std::thread(test,i));
    for(int i = 0; i < SIZE; ++i)
        queue.push(i);
    for(int i = 0; i < SIZE; ++i)
       threads[i].join();
    return 0;
}

这个想法是为每个线程创建一个唯一的票证。使用条件变量,然后在wait_and_pop()函数中等待,直到插入一个新事件。在push()函数中,新的事件被插入到队列中,所有等待的线程都被通知。每个线程检查队列不再为空,并且唯一票证是否等于当前计数器。如果是,特定线程离开条件循环,从队列中弹出当前事件并增加计数器。

我怀疑,一些通知丢失了,但我不能得到我的头周围的事实,为什么会发生这种情况。有什么想法如何解决这个问题,或者如何以正确的方式实现这个问题?

编辑我将队列中的代码更改如下。现在它似乎起作用了。重要的部分是,它发出通知,同时仍然持有锁(在push()和wait_and_pop()中)。此外,我将票据系统更改为线程id队列,但这只是为了方便,这使源代码保持紧凑。但我不确定,如果我想使用队列生产代码,因为我不明白为什么它现在可以工作,我不知道它是否在所有情况下都可以工作。也许有人可以评论一下?

template <typename T>
class Tqueue 
{
public:
   void push(const T& e){
       std::unique_lock<std::mutex> lock(m_mutex);
       m_queue.push(e);
       m_cond.notify_all();
   };
   T wait_and_pop() {
       std::unique_lock<std::mutex> lock(m_mutex);
       m_ids.push(std::this_thread::get_id());
       m_cond.wait(lock,[=]{return (!m_queue.empty())
                  && (m_ids.front() == std::this_thread::get_id());});
       T data = m_queue.front();
       m_queue.pop();
       m_ids.pop();
       m_cond.notify_all();
       return data;
   }
private:
   std::queue<T> m_queue;
   std::queue<std::thread::id> m_ids;
   std::mutex m_mutex;
   std::condition_variable m_cond;
 };

通知确实丢失了。push的数量可能会产生更少的线程被唤醒,因为当m_cond.notify_all();被执行时,它只是使等待的线程可运行,即准备运行。这些线程仍然需要等待轮到它们并获得m_cond.wait内部的锁。

也可能是主线程在单个等待线程最终执行之前多次获取互斥锁。导致通知不足。

为了使机制工作,需要在条件受到影响时通知。您已经通知了m_queue.push(e);,这影响了第一个条件!m_queue.empty()。您还需要在wait_and_pop的末尾通知,以处理第二个条件ticket == m_counter

T wait_and_pop() {
   ....blah blah
   T data = m_queue.front();
   m_queue.pop();
   lock.unlock();
   m_cond.notify_all();
   return data;
}

注意:这里的it is possible是指"最终有一个线程调度最终发生"。我不是说" I am not sure"。

进一步说明:condition_variable.notify_all()只保证最终唤醒线程。它不能保证X的调用次数会唤醒X次。此外,由于您的情况,它被减少到保证只通知一个线程,这是根本原因。

关于wait_and_pop解锁前后的通知
wait_and_pop中,在释放锁之前或之后通知应该没有任何区别。我指定的修改应该与编辑中的修改一样。我一直在用很少的变化(线程数,等待x线程完成并再次推动)进行测试,结果相同。

如果在您开始推送之后只有一个线程被启动,那么它将永远被阻塞。您应该先尝试队列是否为空。如果已经有数据了,为什么还要等呢?