生产者使用者在 cpp 中使用互斥锁的问题

Producer Consumer problem using mutexes in cpp

本文关键字:问题 使用者 cpp 生产者      更新时间:2023-10-16

我有一个生产者和 2 个使用者线程试图访问共享缓冲区。互斥锁在消费者和生产者之间使用。消费者应该平行运行。如果缓冲区为空,则使用者睡眠,生产者必须唤醒它们。如果缓冲区已满,则生产者不执行任何操作。以下是我正在处理的代码片段: 生产者线程:

void *writer(void*)
{
// Initialising the seed
srand(time(NULL));
while(1)
{
pthread_mutex_lock(&rallow);
if (Q.size() < MAX && item < MAX)
{
// Getting the random number
int num = rand() % 10 + 1;
// Pushing the number into queue
Q.push(num);

item++;
cout << "Produced: " << num << " item: "<<item<<endl;
pthread_cond_broadcast(&dataNotProduced); 
}
else if (item == MAX) {
pthread_mutex_unlock(&rallow);
continue;
}
pthread_mutex_unlock(&rallow);
}
}

消费者 1:

void *reader1(void*)
{
while(1)
{
pthread_mutex_lock(&mread);
rc++;
if(rc==1)
pthread_mutex_lock(&rallow);
pthread_mutex_unlock(&mread);

if (Q.size() > 0) {
// Get the data from the front of queue
int data = Q.front();

// Pop the consumed data from queue
Q.pop();
item--;
cout << "B thread consumed: " << data <<endl;

pthread_cond_signal(&dataNotConsumed);
}
else
{
cout << "B is in wait.." << endl;
pthread_cond_wait(&dataNotProduced, &rallow);
cout<<"B woke up"<<endl;
}
pthread_mutex_lock(&mread);
rc--;
if(rc==0)
pthread_mutex_unlock(&rallow);
pthread_mutex_unlock(&mread);
sleep(1);
} 
}

消费者 2:


void *reader2(void*)
{
while(1)
{
pthread_mutex_lock(&mread);
rc++;
if(rc==1)
pthread_mutex_lock(&rallow);
pthread_mutex_unlock(&mread);

if (Q.size() > 0) {
// Get the data from the front of queue
int data = Q.front();

// Pop the consumed data from queue
Q.pop();
item--;
cout << "C thread consumed: " << data <<endl;
pthread_cond_signal(&dataNotConsumed);
}
else
{
cout << "C is in wait.." << endl;
pthread_cond_wait(&dataNotProduced, &rallow);
cout<<"C woke up"<<endl;
}
pthread_mutex_lock(&mread);
rc--;
if(rc==0)
pthread_mutex_unlock(&rallow);
pthread_mutex_unlock(&mread);
sleep(1);
}
}

输出如下所示:

C is in wait..
B is in wait..
Produced: 8 item: 1
Produced: 4 item: 2
Produced: 2 item: 3
Produced: 4 item: 4
Produced: 2 item: 5
Produced: 8 item: 6
Produced: 5 item: 7
Produced: 2 item: 8
Produced: 10 item: 9
Produced: 3 item: 10
>> Producer is in wait..
B woke up
B thread consumed: 8
B thread consumed: 4
B thread consumed: 2
B thread consumed: 4
B thread consumed: 2
B thread consumed: 8
B thread consumed: 5
B thread consumed: 2
B thread consumed: 10
B thread consumed: 3
B is in wait..
C woke up
C is in wait..
Producer woke up

我的疑问是为什么线程 B 和 C 不显示并行执行。为什么生产者一次将值填充到缓冲区中 10 个,而不是给出很少的值,然后消费者消费它,然后再次产生很少的值。A的领导将不胜感激。

else if (item == MAX) {
pthread_mutex_unlock(&rallow);
cout << ">> Producer is in wait.." << endl;
pthread_cond_wait(&dataNotConsumed, &rallow);

您解锁互斥锁,然后等待。你不能这么做。这将创建一个窗口,在此期间,您正在等待的事情可以在您等待之前发生。您必须在按住互斥锁时调用pthread_cond_wait,以确保在您决定等待之后但在开始等待之前不会发生您正在等待的事情。

你的消费者中还有另一个巨大的错误。一个线程可以锁定rallow然后另一个线程可以尝试解锁它。这是不允许的 - 获取互斥锁的线程必须是释放它的线程。您不需要两个互斥锁 - 只需使用一个保护所有状态的互斥体。

首先,不能保证所有线程将始终并发运行。如果它们在单个内核上运行,操作系统将为每个线程提供数十毫秒的时间片。如果它们在不同的内核上运行,那么一个线程调用pthread_cond_broadcast()和另一个线程从pthread_cond_wait()唤醒之间存在延迟。这很容易解释编写器线程能够在另一个线程唤醒之前将 10 个项目推送到队列。

下一个问题是,为什么 B 消耗了所有项目,而 C 什么也得不到?问题是因为这个:

pthread_mutex_lock(&mread);
rc++;
if(rc == 1)
pthread_mutex_lock(&rallow);
pthread_mutex_unlock(&mread);

考虑线程 B 和 C 各自紧接着执行此块。两者都可以锁定mread,两者都会递增rc,但只有一个会锁定rallow。接下来会发生什么是不确定的,因为它们都尝试访问队列,即使其中一个不会持有锁。

应该不需要有两个互斥锁。两个消费者线程都应该无条件地锁定rallow,检查队列中是否有内容,如果没有,则调用pthread_cond_wait()

由于您使用的是C++,因此实际上应该使用 C++11 的线程支持,而不是使用 C pthread 函数。然后,您的代码应如下所示:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex rallow;
std::condition_variable dataProduced;
std::condition_variable dataConsumed;
void writer() {
while(true) {
// Generate the random number
int num = rand() % 10 + 1;
std::cout << "Produced: " << num << "n";
// Push it to the queue
{
std::lock_guard<std::mutex> lock(rallow);
dataConsumed.wait(rallow, [](){return Q.size() < MAX;});
Q.push(num);
}
}
}
void reader(int id) {
while(true) {
int data;
// Pop an item from the queue
{
std::lock_guard<std::mutex> lock(rallow);
dataProduced.wait(rallow, [](){return Q.size() > 0;});
data = Q.front();
Q.pop();
}
// Process the data
std::cout << "Consumer thread " << id << " consumed: " << data << "n";
}
}

您甚至可以创建一个线程安全的队列类来处理互斥体和条件变量本身,因此生产者和使用者代码将简化为:

void writer() {
while(true) {
int num = rand() % 10 + 1;
std::cout << "Produced: " << num << "n";
Q.push(num);
}
}
void reader(int id) {
while(true) {
int data = Q.pop();
std::cout << "Consumer thread " << id << " consumed: " << data << "n";
}
}