C++生产者使用者中,同一使用者线程会抓取所有任务
C++ Producer Consumer, same consumer thread grabs all tasks
我正在用 c++ 实现一个生产者消费者项目,当我运行该程序时,同一个消费者几乎抓取了所有的工作,而不会让任何其他消费者线程抓取任何工作。有时,其他线程确实会得到一些工作,但随后其他线程会控制一段时间。例如,TID 10 几乎可以抓取所有的工作,但突然之间 TID 12 会抓住它,中间没有其他消费者线程的工作。
知道为什么其他线程没有机会抓住工作吗?
#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <csignal>
#include <unistd.h>
using namespace std;
int max_queue_size = 100;
int num_producers = 5;
int num_consumers = 7;
int num_operations = 40;
int operations_created = 0;
thread_local int operations_created_by_this_thread = 0;
int operations_consumed = 0;
thread_local int operations_consumed_by_this_thread = 0;
struct thread_stuff {
int a;
int b;
int operand_num;
char operand;
};
char operands[] = {'+', '-', '/', '*'};
deque<thread_stuff> q;
bool finished = false;
condition_variable cv;
mutex queue_mutex;
void producer(int n) {
while (operations_created_by_this_thread < num_operations) {
int oper_num = rand() % 4;
thread_stuff equation;
equation.a = rand();
equation.b = rand();
equation.operand_num = oper_num;
equation.operand = operands[oper_num];
while ((operations_created - operations_consumed) >= max_queue_size) {
// don't do anything until it has space available
}
{
lock_guard<mutex> lk(queue_mutex);
q.push_back(equation);
operations_created++;
}
cv.notify_all();
operations_created_by_this_thread++;
this_thread::__sleep_for(chrono::seconds(rand() % 2), chrono::nanoseconds(0));
}
{
lock_guard<mutex> lk(queue_mutex);
if(operations_created == num_operations * num_producers){
finished = true;
}
}
cv.notify_all();
}
void consumer() {
while (true) {
unique_lock<mutex> lk(queue_mutex);
cv.wait(lk, [] { return finished || !q.empty(); });
if(!q.empty()) {
thread_stuff data = q.front();
q.pop_front();
operations_consumed++;
operations_consumed_by_this_thread++;
int ans = 0;
switch (data.operand_num) {
case 0:
ans = data.a + data.b;
break;
case 1:
ans = data.a - data.b;
break;
case 2:
ans = data.a / data.b;
break;
case 3:
ans = data.a * data.b;
break;
}
cout << "Operation " << operations_consumed << " processed by PID " << getpid()
<< " TID " << this_thread::get_id() << ": "
<< data.a << " " << data.operand << " " << data.b << " = " << ans << " queue size: "
<< (operations_created - operations_consumed) << endl;
}
this_thread::yield();
if (finished) break;
}
}
void usr1_handler(int signal) {
cout << "Status: Produced " << operations_created << " operations and "
<< (operations_created - operations_consumed) << " operations are in the queue" << endl;
}
void usr2_handler(int signal) {
cout << "Status: Consumed " << operations_consumed << " operations and "
<< (operations_created - operations_consumed) << " operations are in the queue" << endl;
}
int main(int argc, char *argv[]) {
if (argc < 5) {
cout << "Invalid number of parameters passed in" << endl;
exit(1);
}
max_queue_size = atoi(argv[1]);
num_operations = atoi(argv[2]);
num_producers = atoi(argv[3]);
num_consumers = atoi(argv[4]);
// signal(SIGUSR1, usr1_handler);
// signal(SIGUSR2, usr2_handler);
thread producers[num_producers];
thread consumers[num_consumers];
for (int i = 0; i < num_producers; i++) {
producers[i] = thread(producer, num_operations);
}
for (int i = 0; i < num_consumers; i++) {
consumers[i] = thread(consumer);
}
for (int i = 0; i < num_producers; i++) {
producers[i].join();
}
for (int i = 0; i < num_consumers; i++) {
consumers[i].join();
}
cout << "finished!" << endl;
}
您一直持有互斥锁 - 包括yield()
- 在持有互斥锁时。
像在生产者代码中一样确定unique_lock的范围,从队列中弹出并以原子方式递增计数器。
我看到您有一个最大队列大小。如果队列已满,您需要第二个条件让生产者等待,并且使用者将在消耗项目时发出此条件的信号。
知道为什么其他线程没有机会抓住工作吗?
这项民意调查令人不安:
while ((operations_created - operations_consumed) >= max_queue_size)
{
// don't do anything until it has space available
}
您可以尝试在循环中设置最小的延迟...这是一个"坏邻居",可以"消耗"一个核心。
您的代码几乎没有问题:
使用普通变量进行线程间通信
下面是一个示例:
int operations_created = 0;
int operations_consumed = 0;
void producer(int n) {
[...]
while ((operations_created - operations_consumed) >= max_queue_size) { }
及以后
void consumer() {
[...]
operations_consumed++;
这仅适用于没有优化的 x86 架构,即-O0
.一旦我们尝试启用优化,编译器将优化 while 循环以:
void producer(int n) {
[...]
if ((operations_created - operations_consumed) >= max_queue_size) {
while (true) { }
}
所以,你的程序只是挂在这里。您可以在编译器资源管理器上检查这一点。
mov eax, DWORD PTR operations_created[rip]
sub eax, DWORD PTR operations_consumed[rip]
cmp eax, DWORD PTR max_queue_size[rip]
jl .L19 // here is the if before the loop
.L20:
jmp .L20 // here is the empty loop
.L19:
为什么会这样?从单线程程序的角度来看,while (condition) { operators }
完全等同于if (condition) while (true) { operators }
,如果operators
不更改condition
。
要解决此问题,我们应该使用std::atomic<int>
而不是简单的int
.这些是为线程间通信而设计的,因此编译器将避免此类优化并生成正确的程序集。
消费者在yield()
时锁定互斥体
看看这个片段:
void consumer() {
while (true) {
unique_lock<mutex> lk(queue_mutex);
[...]
this_thread::yield();
[...]
}
基本上,这意味着消费者在持有锁yield()
。由于一次只能有一个消费者持有锁(互斥代表互斥),这就解释了为什么其他消费者不能消费作品。
要解决此问题,我们应该在yield()
之前解锁queue_mutex
,即:
void consumer() {
while (true) {
{
unique_lock<mutex> lk(queue_mutex);
[...]
}
this_thread::yield();
[...]
}
这仍然不能保证只有一个线程可以完成大部分任务。当我们在生产者中notify_all()
时,所有线程都会被唤醒,但只有一个线程会锁定互斥锁。由于我们安排的工作很小,当生产者调用notify_all()
时,我们的线程将完成工作,完成yield()
并为下一个工作做好准备。
那么为什么这个线程锁定互斥锁,而不是另一个呢?我想这是由于 CPU 缓存和忙于等待而发生的。刚刚完成工作的线程是"热"的,它在 CPU 缓存中并准备锁定互斥锁。在睡觉之前,它也可能会尝试忙于等待互斥锁几个周期,这增加了它获胜的机会。
为了解决这个问题,我们可以删除生产者中的睡眠(这样它就会更频繁地唤醒其他线程,所以其他线程也会"热"),或者在消费者中执行sleep()
而不是yield()
(所以这个线程在睡眠期间变得"冷")。
无论如何,由于互斥锁,没有机会并行完成工作,因此同一线程完成大部分工作这一事实是完全自然的 IMO。
- 为什么这个加载函数只抓取文件中的第一件事?
- 无法使用 OpenCv 3.4.5 从具有C++ dll 的网络摄像机 (rtsp) 中抓取帧
- 如何从日志文件中抓取状态代码?(在 C++ 中)
- DXGI API:AcquireTextFrame()从不抓取更新的图像,始终为空
- 如何从 avi 文件中抓取所有帧 - 如何修改 MS 样本采集卡样本
- 如何抓取指向Qt中弹出对话框的指针,该对话框阻止了QTest中的UI线程
- 为什么抓取窗口标题的代码会导致应用崩溃?
- 用Xcb而不是Xlib抓取像素的颜色
- 如何在C ++(qt)代码中使用python脚本?(网页抓取方面)
- OpenCV 3.2 在抓取时提供选择超时,但 fscyber 可以工作
- 使用 Matrox 进行帧抓取
- C++生产者使用者中,同一使用者线程会抓取所有任务
- 在编译时间之前抓取常数值
- OpenCV + QML(从另一个线程抓取帧)
- OpenCV视频捕获抓取和检索
- 从文本中抓取句子,将所有句子分别存储在某个数据结构中
- 视频抓取不起作用 OpenCV
- 不抓取时的QGraphicsPixmapItem mouseMoveEvent
- 从二进制文件中抓取文本时,为什么 xdg_vtnr=8 是我的结果
- UWP中用于拆分的抓取器