无锁的生产者/消费者队列
C++ Lock free producer/consumer queue
我正在查看无锁队列的示例代码:
http://drdobbs.com/high-performance-computing/210604448?pgno=2(也可以参考许多SO问题,例如在c++中是否有生产就绪的无锁队列或哈希实现)
这看起来应该适用于单个生产者/消费者,尽管代码中有许多错字。我已经将代码更新为如下所示,但是它在我身上崩溃了。有人知道为什么吗?
特别是,是否应该将divider和last声明为:
atomic<Node *> divider, last; // shared
我在这台机器上没有支持c++ 0x的编译器,所以也许这就是我所需要的…
// Implementation from http://drdobbs.com/high-performance-computing/210604448
// Note that the code in that article (10/26/11) is broken.
// The attempted fixed version is below.
template <typename T>
class LockFreeQueue {
private:
struct Node {
Node( T val ) : value(val), next(0) { }
T value;
Node* next;
};
Node *first, // for producer only
*divider, *last; // shared
public:
LockFreeQueue()
{
first = divider = last = new Node(T()); // add dummy separator
}
~LockFreeQueue()
{
while( first != 0 ) // release the list
{
Node* tmp = first;
first = tmp->next;
delete tmp;
}
}
void Produce( const T& t )
{
last->next = new Node(t); // add the new item
last = last->next; // publish it
while (first != divider) // trim unused nodes
{
Node* tmp = first;
first = first->next;
delete tmp;
}
}
bool Consume( T& result )
{
if (divider != last) // if queue is nonempty
{
result = divider->next->value; // C: copy it back
divider = divider->next; // D: publish that we took it
return true; // and report success
}
return false; // else report empty
}
};
我写了下面的代码来测试它。Main(未显示)只调用TestQ()。
#include "LockFreeQueue.h"
const int numThreads = 1;
std::vector<LockFreeQueue<int> > q(numThreads);
void *Solver(void *whichID)
{
int id = (long)whichID;
printf("Thread %d initializedn", id);
int result = 0;
do {
if (q[id].Consume(result))
{
int y = 0;
for (int x = 0; x < result; x++)
{ y++; }
y = 0;
}
} while (result != -1);
return 0;
}
void TestQ()
{
std::vector<pthread_t> threads;
for (int x = 0; x < numThreads; x++)
{
pthread_t thread;
pthread_create(&thread, NULL, Solver, (void *)x);
threads.push_back(thread);
}
for (int y = 0; y < 1000000; y++)
{
for (unsigned int x = 0; x < threads.size(); x++)
{
q[x].Produce(y);
}
}
for (unsigned int x = 0; x < threads.size(); x++)
{
q[x].Produce(-1);
}
for (unsigned int x = 0; x < threads.size(); x++)
pthread_join(threads[x], 0);
}
更新:最终崩溃是由队列声明引起的:
std::vector<LockFreeQueue<int> > q(numThreads);
当我将其更改为一个简单数组时,它运行良好。(我实现了一个带锁的版本,它也崩溃了。)我看到析构函数在构造函数之后立即被调用,导致双释放内存。但是,有人知道为什么析构函数会被std::vector立即调用吗?
您需要将几个指针std::原子化,并且需要在循环中使用compare_exchange_weak来自动更新它们。否则,多个消费者可能会消费同一个节点,多个生产者可能会破坏列表。
这些写操作(仅是代码中的一个示例)按顺序进行是非常重要的:
last->next = new Node(t); // add the new item
last = last->next; // publish it
c++不能保证这一点——优化器可以随心所欲地重新安排事情,只要当前线程总是像程序完全按照您编写的方式运行一样。然后CPU缓存可以出现并进一步重新排序。
你需要内存栅栏。使指针使用原子类型应该具有这种效果。
这可能是完全错误的,但我不禁想知道你是否有某种静态初始化相关的问题…为了搞笑,尝试将q
声明为指向无锁队列向量的指针,并将其分配到main()
中的堆上。
相关文章:
- boost::进程间消息队列引发错误
- 如果我只是不访问queue_front节点的子节点,而是将它们推到队列中呢?还是BFS吗
- Android NDK传感器向事件队列报告奇怪的间隔
- C++优先级队列,按对象的唯一指针的特定方法升序排列
- 按对象的特定方法按升序排列的C++优先级队列
- 使用2个键的cpp-stl::优先级队列排序不正确
- 我是否需要在下一次转移时将所有权*转移回转移队列
- 在一个读写器队列中,我可以用volatile替换原子吗
- C++ deque 消费者总是从生产者那里得到空队列
- 如何查看Tibco EMS独享队列是否有活跃消费者?
- 为什么这不是正确的生产者消费者模型以及当我使用 stl 队列时导致错误的原因
- 生产者-消费者队列 - std::queue 或用户编写的链接列表
- 具有四个队列的多线程生产者/消费者
- 线程安全FIFO/队列(多个生产者,一个消费者)
- 在C++11中锁定释放多个生产者/消费者队列
- 在c++中实现一个多生产者/消费者无锁队列
- 无锁的生产者/消费者队列
- 生产者/消费者设计-在Qt中跨线程共享队列变量
- boost条件不在具有两个生产者和一个消费者的线程安全队列上工作
- 具有两个线程(生产者、消费者)的 STL 队列