公平队列丢失通知
Fair queue loses notifications
考虑以下代码
#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
template <typename T>
class Tqueue
{
public:
Tqueue() : m_next_ticket(0),
m_counter(0) {}
void push(const T& e){
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(e);
lock.unlock();
m_cond.notify_all();
};
T wait_and_pop() {
std::unique_lock<std::mutex> lock(m_mutex);
int ticket = m_next_ticket++;
m_cond.wait(lock,[=]{return (!m_queue.empty())
&& (ticket == m_counter);});
m_counter++;
T data = m_queue.front();
m_queue.pop();
return data;
}
private:
int m_next_ticket;
int m_counter;
std::queue<T> m_queue;
std::mutex m_mutex;
std::condition_variable m_cond;
};
这应该是我提出的公平队列的模板。在此上下文中,公平意味着wait_and_pop()调用以不同线程调用的顺序返回。
例如:线程1在空队列上调用wait_and_pop()并阻塞。然后,线程2在空队列上调用wait_and_pop()并阻塞。之后,线程3用push()推两个事件。现在线程1应该先于线程2返回。
使用下面的代码,它不时地工作。但大多数时候代码永远阻塞:
Tqueue<int> queue;
std::mutex mutex;
void test(int i)
{
auto bla = queue.wait_and_pop();
std::cout << "Thread : "<<bla << std::endl;
}
const int SIZE = 200;
int main(int argc, char *argv[])
{
std::vector<std::thread> threads;
for(int i = 0; i < SIZE; ++i)
threads.push_back(std::thread(test,i));
for(int i = 0; i < SIZE; ++i)
queue.push(i);
for(int i = 0; i < SIZE; ++i)
threads[i].join();
return 0;
}
这个想法是为每个线程创建一个唯一的票证。使用条件变量,然后在wait_and_pop()函数中等待,直到插入一个新事件。在push()函数中,新的事件被插入到队列中,所有等待的线程都被通知。每个线程检查队列不再为空,并且唯一票证是否等于当前计数器。如果是,特定线程离开条件循环,从队列中弹出当前事件并增加计数器。
我怀疑,一些通知丢失了,但我不能得到我的头周围的事实,为什么会发生这种情况。有什么想法如何解决这个问题,或者如何以正确的方式实现这个问题?
编辑我将队列中的代码更改如下。现在它似乎起作用了。重要的部分是,它发出通知,同时仍然持有锁(在push()和wait_and_pop()中)。此外,我将票据系统更改为线程id队列,但这只是为了方便,这使源代码保持紧凑。但我不确定,如果我想使用队列生产代码,因为我不明白为什么它现在可以工作,我不知道它是否在所有情况下都可以工作。也许有人可以评论一下?
template <typename T>
class Tqueue
{
public:
void push(const T& e){
std::unique_lock<std::mutex> lock(m_mutex);
m_queue.push(e);
m_cond.notify_all();
};
T wait_and_pop() {
std::unique_lock<std::mutex> lock(m_mutex);
m_ids.push(std::this_thread::get_id());
m_cond.wait(lock,[=]{return (!m_queue.empty())
&& (m_ids.front() == std::this_thread::get_id());});
T data = m_queue.front();
m_queue.pop();
m_ids.pop();
m_cond.notify_all();
return data;
}
private:
std::queue<T> m_queue;
std::queue<std::thread::id> m_ids;
std::mutex m_mutex;
std::condition_variable m_cond;
};
通知确实丢失了。push
的数量可能会产生更少的线程被唤醒,因为当m_cond.notify_all();
被执行时,它只是使等待的线程可运行,即准备运行。这些线程仍然需要等待轮到它们并获得m_cond.wait
内部的锁。
也可能是主线程在单个等待线程最终执行之前多次获取互斥锁。导致通知不足。
为了使机制工作,需要在条件受到影响时通知。您已经通知了m_queue.push(e);
,这影响了第一个条件!m_queue.empty()
。您还需要在wait_and_pop
的末尾通知,以处理第二个条件ticket == m_counter
。
T wait_and_pop() {
....blah blah
T data = m_queue.front();
m_queue.pop();
lock.unlock();
m_cond.notify_all();
return data;
}
注意:这里的it is possible
是指"最终有一个线程调度最终发生"。我不是说" I am not sure"。
进一步说明:condition_variable.notify_all()
只保证最终唤醒线程。它不能保证X
的调用次数会唤醒X
次。此外,由于您的情况,它被减少到保证只通知一个线程,这是根本原因。
关于wait_and_pop解锁前后的通知
在wait_and_pop
中,在释放锁之前或之后通知应该没有任何区别。我指定的修改应该与编辑中的修改一样。我一直在用很少的变化(线程数,等待x线程完成并再次推动)进行测试,结果相同。
如果在您开始推送之后只有一个线程被启动,那么它将永远被阻塞。您应该先尝试队列是否为空。如果已经有数据了,为什么还要等呢?
- boost::进程间消息队列引发错误
- 如果我只是不访问queue_front节点的子节点,而是将它们推到队列中呢?还是BFS吗
- Android NDK传感器向事件队列报告奇怪的间隔
- C++优先级队列,按对象的唯一指针的特定方法升序排列
- 按对象的特定方法按升序排列的C++优先级队列
- 使用2个键的cpp-stl::优先级队列排序不正确
- 我是否需要在下一次转移时将所有权*转移回转移队列
- 在一个读写器队列中,我可以用volatile替换原子吗
- 为什么我的多线程作业队列崩溃
- 尝试将lambda函数放在队列中时出现一般分配器错误(可能是与unique_ptr有关的错误)
- 使用"Task"函数指针队列定义作业管理器
- 在c++队列中使用pop和visit实现线程安全
- 为什么我需要C++中不同的排序格式来对这个USACO代码上的数组和优先级队列进行排序
- 打印优先级队列
- 函数如何通知用户它基于函数原型抛出异常?
- 共享队列的线程安全
- 带自定义比较器的最小优先级队列
- C++ Poco - 如何创建通知队列的向量?
- 视觉提升 条件变量中的同步队列C++不通知其他线程上的等待类方法
- 公平队列丢失通知