阻塞队列竞争条件
Blocking queue race condition?
我正在尝试实现一个高性能的阻塞队列,该队列由pthreads、信号量.h和gcc原子内置的循环缓冲区支持。队列需要处理来自不同线程的多个同时读写器。
我已经隔离了某种种族条件,我不确定这是否是对一些原子操作和信号量行为的错误假设,或者我的设计是否存在根本缺陷。
我已经将其提取并简化为下面的独立示例。我希望这个节目永远不会回来。然而,在队列中检测到损坏的几十万次迭代之后,它确实会返回。
在下面的示例中(为了说明),它实际上并没有存储任何内容,它只是将一个保存实际数据的单元格设置为1,将0设置为表示一个空单元格。有一个计数信号量(空位)表示空闲单元的数量,另一个计数信息量(占用者)表示占用单元的数量。
作家做以下事情:
- 减少空缺
- 原子地获取下一个头索引(mod队列大小)
- 写信给它
- 增加占用人数
读者的做法正好相反:
- 减少占用者
- 原子地获取下一个尾部索引(mod队列大小)
- 从中读取
- 增加空缺
我希望在给定上述内容的情况下,恰好一个线程可以同时读取或写入任何给定的单元格。
任何关于它为什么不起作用或调试策略的想法都值得赞赏。下面的代码和输出。。。
#include <stdlib.h>
#include <semaphore.h>
#include <iostream>
using namespace std;
#define QUEUE_CAPACITY 8 // must be power of 2
#define NUM_THREADS 2
struct CountingSemaphore
{
sem_t m;
CountingSemaphore(unsigned int initial) { sem_init(&m, 0, initial); }
void post() { sem_post(&m); }
void wait() { sem_wait(&m); }
~CountingSemaphore() { sem_destroy(&m); }
};
struct BlockingQueue
{
unsigned int head; // (head % capacity) is next head position
unsigned int tail; // (tail % capacity) is next tail position
CountingSemaphore vacancies; // how many cells are vacant
CountingSemaphore occupants; // how many cells are occupied
int cell[QUEUE_CAPACITY];
// (cell[x] == 1) means occupied
// (cell[x] == 0) means vacant
BlockingQueue() :
head(0),
tail(0),
vacancies(QUEUE_CAPACITY),
occupants(0)
{
for (size_t i = 0; i < QUEUE_CAPACITY; i++)
cell[i] = 0;
}
// put an item in the queue
void put()
{
vacancies.wait();
// atomic post increment
set(__sync_fetch_and_add(&head, 1) % QUEUE_CAPACITY);
occupants.post();
}
// take an item from the queue
void take()
{
occupants.wait();
// atomic post increment
get(__sync_fetch_and_add(&tail, 1) % QUEUE_CAPACITY);
vacancies.post();
}
// set cell i
void set(unsigned int i)
{
// atomic compare and assign
if (!__sync_bool_compare_and_swap(&cell[i], 0, 1))
{
corrupt("set", i);
exit(-1);
}
}
// get cell i
void get(unsigned int i)
{
// atomic compare and assign
if (!__sync_bool_compare_and_swap(&cell[i], 1, 0))
{
corrupt("get", i);
exit(-1);
}
}
// corruption detected
void corrupt(const char* action, unsigned int i)
{
static CountingSemaphore sem(1);
sem.wait();
cerr << "corruption detected" << endl;
cerr << "action = " << action << endl;
cerr << "i = " << i << endl;
cerr << "head = " << head << endl;
cerr << "tail = " << tail << endl;
for (unsigned int j = 0; j < QUEUE_CAPACITY; j++)
cerr << "cell[" << j << "] = " << cell[j] << endl;
}
};
BlockingQueue q;
// keep posting to the queue forever
void* Source(void*)
{
while (true)
q.put();
return 0;
}
// keep taking from the queue forever
void* Sink(void*)
{
while (true)
q.take();
return 0;
}
int main()
{
pthread_t id;
// start some pthreads to run Source function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Source, 0))
abort();
// start some pthreads to run Sink function
for (int i = 0; i < NUM_THREADS; i++)
if (pthread_create(&id, NULL, &Sink, 0))
abort();
while (true);
}
按如下方式编译以上内容:
$ g++ -pthread AboveCode.cpp
$ ./a.out
每次的输出都不一样,但这里有一个例子:
corruption detected
action = get
i = 6
head = 122685
tail = 122685
cell[0] = 0
cell[1] = 0
cell[2] = 1
cell[3] = 0
cell[4] = 1
cell[5] = 0
cell[6] = 1
cell[7] = 1
我的系统是英特尔酷睿2:上的Ubuntu 11.10
$ uname -a
Linux 3.0.0-14-generic #23-Ubuntu SMP
Mon Nov 21 20:28:43 UTC 2011 x86_64 x86_64 x86_64 GNU/Linux
$ cat /proc/cpuinfo | grep Intel
model name : Intel(R) Core(TM)2 Quad CPU Q9300 @ 2.50GHz
$ g++ --version
g++ (Ubuntu/Linaro 4.6.1-9ubuntu3) 4.6.1
谢谢,安德鲁。
一种可能的情况,逐步跟踪两个写入线程(W0,W1)和一个读取线程(R0)。W0早于W1进入put(),被操作系统或硬件中断,随后完成。
w0 (core 0) w1 (core 1) r0
t0 ---- --- blocked on occupants.wait() / take
t1 entered put() --- ---
t2 vacancies.wait() entered put() ---
t3 got new_head = 1 vacancies.wait() ---
t4 <interrupted by OS> got new_head = 2 ---
t5 written 1 at cell[2] ---
t6 occupants.post(); ---
t7 exited put() waked up
t8 --- got new_tail = 1
t9 <still in interrupt> --- read 0 from ceil[1] !! corruption !!
t10 written 1 at cell[1]
t11 occupants.post();
t12 exited put()
从设计的角度来看,我会将整个队列视为一个共享资源,并用一个互斥对象来保护它。
作家做以下事情:
- 获取互斥
- 写入队列(包括索引处理)
- 释放互斥
读者可以执行以下操作:
- 获取互斥
- 从队列中读取(包括索引处理)
- 释放互斥
我有一个理论。这是一个循环队列,所以一个读取线程可能会被重叠。假设读者获取索引0。在它做任何事情之前,它会失去CPU。另一个阅读器线程获取索引1,然后是2,然后是3。。。然后是7,然后是0。第一个读取器醒来,两个线程都认为它们对索引0具有独占访问权限。不知道如何证明。希望能有所帮助。
- compare_exchange C++函数如何确定竞争条件?
- 在C++中写入相同值的竞争条件?
- QByteArray 通过队列连接按值发出并连接并附加到竞争条件?
- 从stdin读取时子进程挂起(fork/dup2竞争条件)
- POSIX 条件变量和互斥体"竞争"
- 如何修复条件变量等待/通知的竞争条件
- 在没有互斥锁的情况下重新计数时如何避免竞争条件?
- 替代 rand() 以避免竞争条件?
- CUDA 内核中的竞争条件
- 为什么跨线程更改共享变量的代码显然没有受到竞争条件的影响
- 类声明自己(*this)为private以避免竞争条件/放弃在gcc中对threadprivate的请求
- 使用 gtest EXPECT_CALL 时竞争条件段错误,而另一个期望是执行相同的方法
- boost::进程间消息队列创建时的竞争条件
- 如何在没有竞争条件的情况下将 QFutureWatcher 与 QtConcurrent::run() 一起使用
- 为什么 printf 可以屏蔽竞争条件,而系统日志不能?
- 为什么这个代码会产生竞争条件
- 为什么 CUDA 同步点不能阻止竞争条件?
- 竞争条件:一个线程创建静态对象,另一个线程在初始化完成之前使用它.如何处理
- 如何使用 Boost Atomic 删除竞争条件
- boost::mutex 无法帮助避免C++程序中的竞争条件