C++生产者使用者中,同一使用者线程会抓取所有任务

C++ Producer Consumer, same consumer thread grabs all tasks

本文关键字:使用者 抓取 任务 线程 生产者 C++      更新时间:2023-10-16

我正在用 c++ 实现一个生产者消费者项目,当我运行该程序时,同一个消费者几乎抓取了所有的工作,而不会让任何其他消费者线程抓取任何工作。有时,其他线程确实会得到一些工作,但随后其他线程会控制一段时间。例如,TID 10 几乎可以抓取所有的工作,但突然之间 TID 12 会抓住它,中间没有其他消费者线程的工作。

知道为什么其他线程没有机会抓住工作吗?

#include <thread>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <deque>
#include <csignal>
#include <unistd.h>
using namespace std;
int max_queue_size = 100;
int num_producers = 5;
int num_consumers = 7;
int num_operations = 40;
int operations_created = 0;
thread_local int operations_created_by_this_thread = 0;
int operations_consumed = 0;
thread_local int operations_consumed_by_this_thread = 0;
struct thread_stuff {
int a;
int b;
int operand_num;
char operand;
};
char operands[] = {'+', '-', '/', '*'};
deque<thread_stuff> q;
bool finished = false;
condition_variable cv;
mutex queue_mutex;
void producer(int n) {
while (operations_created_by_this_thread < num_operations) {
int oper_num = rand() % 4;
thread_stuff equation;
equation.a = rand();
equation.b = rand();
equation.operand_num = oper_num;
equation.operand = operands[oper_num];

while ((operations_created - operations_consumed) >= max_queue_size) {
// don't do anything until it has space available
}
{
lock_guard<mutex> lk(queue_mutex);
q.push_back(equation);
operations_created++;
}
cv.notify_all();
operations_created_by_this_thread++;
this_thread::__sleep_for(chrono::seconds(rand() % 2), chrono::nanoseconds(0));
}
{
lock_guard<mutex> lk(queue_mutex);
if(operations_created == num_operations * num_producers){
finished = true;
}
}
cv.notify_all();
}
void consumer() {
while (true) {
unique_lock<mutex> lk(queue_mutex);
cv.wait(lk, [] { return finished || !q.empty(); });
if(!q.empty()) {
thread_stuff data = q.front();
q.pop_front();
operations_consumed++;
operations_consumed_by_this_thread++;
int ans = 0;
switch (data.operand_num) {
case 0:
ans = data.a + data.b;
break;
case 1:
ans = data.a - data.b;
break;
case 2:
ans = data.a / data.b;
break;
case 3:
ans = data.a * data.b;
break;
}
cout << "Operation " << operations_consumed << " processed by PID " << getpid()
<< " TID " << this_thread::get_id() << ": "
<< data.a << " " << data.operand << " " << data.b << " = " << ans << " queue size: "
<< (operations_created - operations_consumed) << endl;
}
this_thread::yield();
if (finished) break;
}
}
void usr1_handler(int signal) {
cout << "Status: Produced " << operations_created << " operations and "
<< (operations_created - operations_consumed) << " operations are in the queue" << endl;
}
void usr2_handler(int signal) {
cout << "Status: Consumed " << operations_consumed << " operations and "
<< (operations_created - operations_consumed) << " operations are in the queue" << endl;
}
int main(int argc, char *argv[]) {
if (argc < 5) {
cout << "Invalid number of parameters passed in" << endl;
exit(1);
}
max_queue_size = atoi(argv[1]);
num_operations = atoi(argv[2]);
num_producers = atoi(argv[3]);
num_consumers = atoi(argv[4]);
//    signal(SIGUSR1, usr1_handler);
//    signal(SIGUSR2, usr2_handler);
thread producers[num_producers];
thread consumers[num_consumers];
for (int i = 0; i < num_producers; i++) {
producers[i] = thread(producer, num_operations);
}
for (int i = 0; i < num_consumers; i++) {
consumers[i] = thread(consumer);
}
for (int i = 0; i < num_producers; i++) {
producers[i].join();
}
for (int i = 0; i < num_consumers; i++) {
consumers[i].join();
}
cout << "finished!" << endl;
}

您一直持有互斥锁 - 包括yield()- 在持有互斥锁时。

像在生产者代码中一样确定unique_lock的范围,从队列中弹出并以原子方式递增计数器。

我看到您有一个最大队列大小。如果队列已满,您需要第二个条件让生产者等待,并且使用者将在消耗项目时发出此条件的信号。

知道为什么其他线程没有机会抓住工作吗?

这项民意调查令人不安:

while ((operations_created - operations_consumed) >= max_queue_size) 
{
// don't do anything until it has space available
}

您可以尝试在循环中设置最小的延迟...这是一个"坏邻居",可以"消耗"一个核心。

您的代码几乎没有问题:

使用普通变量进行线程间通信

下面是一个示例:

int operations_created = 0;
int operations_consumed = 0;
void producer(int n) {
[...]
while ((operations_created - operations_consumed) >= max_queue_size) { }

及以后

void consumer() {
[...]
operations_consumed++;

这仅适用于没有优化的 x86 架构,即-O0.一旦我们尝试启用优化,编译器将优化 while 循环以:

void producer(int n) {
[...]
if ((operations_created - operations_consumed) >= max_queue_size) {
while (true) { }
}

所以,你的程序只是挂在这里。您可以在编译器资源管理器上检查这一点。

mov eax, DWORD PTR operations_created[rip]
sub eax, DWORD PTR operations_consumed[rip]
cmp eax, DWORD PTR max_queue_size[rip]
jl .L19 // here is the if before the loop
.L20:
jmp .L20 // here is the empty loop
.L19:

为什么会这样?从单线程程序的角度来看,while (condition) { operators }完全等同于if (condition) while (true) { operators },如果operators不更改condition

要解决此问题,我们应该使用std::atomic<int>而不是简单的int.这些是为线程间通信而设计的,因此编译器将避免此类优化并生成正确的程序集。

消费者在yield()时锁定互斥体

看看这个片段:

void consumer() {
while (true) {
unique_lock<mutex> lk(queue_mutex);
[...]
this_thread::yield();
[...]
}

基本上,这意味着消费者在持有锁yield()。由于一次只能有一个消费者持有锁(互斥代表互斥),这就解释了为什么其他消费者不能消费作品。

要解决此问题,我们应该在yield()之前解锁queue_mutex,即:

void consumer() {
while (true) {
{
unique_lock<mutex> lk(queue_mutex);
[...]
}
this_thread::yield();
[...]
}

这仍然不能保证只有一个线程可以完成大部分任务。当我们在生产者中notify_all()时,所有线程都会被唤醒,但只有一个线程会锁定互斥锁。由于我们安排的工作很小,当生产者调用notify_all()时,我们的线程将完成工作,完成yield()并为下一个工作做好准备。

那么为什么这个线程锁定互斥锁,而不是另一个呢?我想这是由于 CPU 缓存和忙于等待而发生的。刚刚完成工作的线程是"热"的,它在 CPU 缓存中并准备锁定互斥锁。在睡觉之前,它也可能会尝试忙于等待互斥锁几个周期,这增加了它获胜的机会。

为了解决这个问题,我们可以删除生产者中的睡眠(这样它就会更频繁地唤醒其他线程,所以其他线程也会"热"),或者在消费者中执行sleep()而不是yield()(所以这个线程在睡眠期间变得"冷")。

无论如何,由于互斥锁,没有机会并行完成工作,因此同一线程完成大部分工作这一事实是完全自然的 IMO。