线程安全FIFO/队列(多个生产者,一个消费者)
Thread safe FIFO / Queue (Multiple producers, one consumer)
正如标题所说,我正在尝试编写一个队列,该队列可以由多个线程写入,也可以由单个线程读取。作为一个额外的困难,我需要队列输入保持有序(先进先出(。这就是我迷路的地方。Mutexes不一定按照它们被锁定的顺序被唤醒,所以我不知道我可以用什么来实现我想要的?这里有一个简单的程序来说明我正在尝试做什么:
#include "Queue.h"
#include <Windows.h>
#include <fstream>
#include <mutex>
using std::ofstream;
ofstream myFile("result.txt");
Queue<int> myQueue;
DWORD WINAPI WritingThread(LPVOID lpParam);
DWORD WINAPI LockingThread(LPVOID lpParam);
int main()
{
// This thread will block myQueue for 3 seconds
CreateThread(NULL, 0, LockingThread, NULL, 0, NULL);
// During the locked period, I ask myQueue to push numbers from 0 to 49
for (int i = 0; i < 50; i++)
CreateThread(NULL, 0, WritingThread, (LPVOID)new int(i), 0, NULL);
// If the mutex could wake up in order, myQueue would pop up the numbers in order, but it doesn't.
for (int i = 0; i < 50; i++)
myFile << myQueue.pop() << ",";
return EXIT_SUCCESS;
}
DWORD WINAPI LockingThread( LPVOID lpParam )
{
myQueue.lockQueueFor3Seconds();
return 0;
}
DWORD WINAPI WritingThread( LPVOID lpParam )
{
myQueue.push(*(int*)lpParam);
return 0;
}
类Queue的代码就是在那里获得的,请参阅文章底部的完整代码。我所做的只是添加方法"lockQueueFor3Seconds"用于测试目的。该方法定义如下:
void lockQueueFor3Seconds()
{
std::unique_lock<std::mutex> mlock(mutex_);
Sleep(3000);
}
该测试的输出如下:
1,43,39,46,36,44,49,40,35,42,32,31,28,41,27,38,24,23,20,34,19,16,15,12,37,11,7,8,3,33,30,0,45,4,26,18,48,21,47,22,25,17,14,10,6,29,9,2,13,5
正如你所看到的,显然没有订购。谢谢你的帮助!
编辑:我修改了队列,使其为每个表示其顺序的推送调用赋予一个数字,当互斥锁解锁时,队列检查以确保在添加元素之前轮到正确的方法,否则它将返回等待。不确定我是否正确地实现了这一点,但它似乎有效!完整的代码可以在那里找到。
为线程分配要添加的值并期望它们按顺序添加是行不通的,因为你不能强制执行线程的顺序。
相反,让每个线程在运行时添加下一个数字(无论它是什么(。像这样:
std::atomic_int counter;
DWORD WINAPI WritingThread( LPVOID lpParam )
{
myQueue.push( counter++ );
return 0;
}
编辑:增量是原子的还不够。对队列的增量AND推送需要是单个原子操作。这意味着在类之外公开锁变量(它已经是公共的(。
std::atomic_int counter;
DWORD WINAPI WritingThread( LPVOID lpParam )
{
unique_lock<mutex> lock(myQueue.m_mutex);
myQueue.push( counter++ );
return 0;
}
如果mutex
实现允许同一个线程多次调用它,那么这将起作用。否则,你可以做类似的事情:
void pushAndIncrement(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
++item;
mlock.unlock();
cond_.notify_one();
}
我认为你的解决方案(你说是有效的(仍然有种族条件。如果上下文开关在增加字母值之后,但在增加push
内部的counter
值之前,它将以错误的顺序添加字母。这是一个很小的窗口,可能不太可能发生,但如果你把计数器增量放在推的同一个锁里,每次都会很完美。
相关文章:
- 如何创建一个CMake变量,除非显式重写,否则使用默认值
- 删除一个线程上有数百万个字符串的大型哈希映射会影响另一个线程的性能
- 为什么两个不同的未命名名称空间可以共存于一个cpp文件中
- 运行同一解决方案的另一个项目的项目
- 挂起和取消挂起一个文件DLL
- 用C++中的一个变量定义一个常量
- 函数向量_指针有不同的原型,我可以构建一个吗
- 在c++中用vector填充一个简单的动态数组
- 如何在选项卡视图Qt中设置一个新项目,并保存以前的项目
- 预处理器:插入结构名称中的前一个行号
- 我在c++代码中生成了一个运行时#3异常
- 我想将一个对T类型的非常量左值引用绑定到一个T类型的临时值
- 从链接列表c++中删除一个项目
- 线程安全FIFO/队列(多个生产者,一个消费者)
- 一个生产者,两个消费者对生产者生产的'queue'采取行动
- 在c++中实现一个多生产者/消费者无锁队列
- 在一个简单的c++ 11四线程程序中,如果我注释标准输出打印行,我的两个消费者线程就不会返回
- 在像LMAX破坏者这样的模式中,你如何处理一个缓慢的消费者
- boost条件不在具有两个生产者和一个消费者的线程安全队列上工作
- 为什么我收到一个简单的提升生产者/消费者计划的锁定断言失败