如何将第二个使用者添加到基于 pthread 的生产者-使用者设置中?
How to add a second consumer to a pthread-based producer-consumer setup?
我目前有两个用于生产者-消费者设置的线程,它使用pthread_cond_wait()
和pthread_cond_signal()
在读取数据和处理数据之间交替。
假设我有一个锁、两个条件和一个布尔标志,该标志声明数据缓冲区中是否有数据:
pthread_mutex_t lock;
pthread_cond_t we_have_data;
pthread_cond_t we_need_data;
bool buffer_is_empty = true;
我有一个pthread_t
使用以下函数生成数据(将数据读入缓冲区):
static void* produce(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (!buffer_is_empty) {
pthread_cond_wait(&we_need_data, &lock);
}
pthread_mutex_unlock(&lock);
// read some data into our buffer
pthread_mutex_lock(&lock);
buffer_is_empty = false;
pthread_cond_signal(&we_have_data);
}
}
然后,我有第二个pthread_t
,它在收到we_have_data
信号时使用以下代码来使用该数据:
static void* consume(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (buffer_is_empty) {
pthread_cond_wait(&we_have_data, &lock);
}
pthread_mutex_unlock(&lock);
// process the data in our buffer
pthread_mutex_lock(&lock);
buffer_is_empty = true;
pthread_cond_signal(&we_need_data);
}
}
这工作正常。
我现在想做的是添加第三个线程,如果缓冲区包含某些数据,该线程确实处理来自consume()
函数的数据。
我尝试添加第三个条件,但我的程序挂起。
我设置了一个条件和布尔标志:
bool processing_with_second_consumer;
pthread_cond_t we_need_to_process_data_with_another_consumer;
然后我修改消费者:
static void* consume(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (buffer_is_empty && !processing_with_second_consumer) {
pthread_cond_wait(&we_have_data, &lock);
}
pthread_mutex_unlock(&lock);
// process the data in our buffer
pthread_mutex_lock(&lock);
if (data_meets_our_conditions) {
processing_with_second_consumer = true;
pthread_cond_signal(&we_need_to_process_data_with_another_consumer);
}
buffer_is_empty = true;
pthread_cond_signal(&we_need_data);
}
}
然后我修改生产者以等待布尔值:
static void* produce(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (!buffer_is_empty && !processing_with_second_consumer) {
pthread_cond_wait(&we_need_data, &lock);
}
pthread_mutex_unlock(&lock);
// read some data into our buffer
pthread_mutex_lock(&lock);
buffer_is_empty = false;
pthread_cond_signal(&we_have_data);
}
}
并添加第三个线程以从消费者那里使用:
static void* consume_from_the_consumer(void* arg) {
pthread_mutex_lock(&lock);
for (;;) {
while (!buffer_is_empty && processing_with_second_consumer) {
pthread_cond_wait(&we_need_to_process_data_with_another_consumer, &lock);
}
pthread_mutex_unlock(&lock);
// do more specific processing of the data in our buffer
pthread_mutex_lock(&lock);
processing_with_second_consumer = false;
}
}
我似乎无法让程序正确退出——它基本上在消费者消费时处于无限循环中。
如何正确设置带有 pthread 条件的信令,以允许第三个(或第四个或第五个等)线程?
你生产者只发出we_have_data
的信号。但由于它将buffer_is_empty
设置为 false,它可以使consume_from_the_consumer
线程准备就绪,但它不会取消阻止它,因为它在第二个条件变量上被阻止。
为了让你的生活更简单,我建议两个改变:
- 始终使用
pthread_cond_broadcast
. - 仅使用一个条件变量。
这可能效率略低,但有几类微妙的错误是不可能的。
为了解决三个线程的问题,我需要进行一些更改:
- 将互斥锁移到线程循环中;循环应该做的第一件事是锁定数据,它应该做的最后一件事是解锁它。
- 设置三个
bool
标志:is_new_line_available
、is_new_subdata_available
和is_eof
。 - 设置三个
pthread_cond_t
条件:new_line_is_available
、new_line_is_empty
和new_subdata_is_available
。 - 确保每个线程都有一个条件,在该条件中调用
pthread_exit()
来终止该线程。
生产线程:
static void* produce(void* arg) {
for (;;) {
pthread_mutex_lock(&lock);
while (is_new_line_available) {
pthread_cond_wait(&new_line_is_empty, &lock);
}
// ... read a line of data into buffer ...
if (EOF) {
is_new_line_available = true;
is_new_subdata_available = true;
is_eof = false;
pthread_cond_signal(&new_line_is_available);
pthread_cond_signal(&new_subdata_is_available);
pthread_mutex_unlock(&lock);
pthread_exit(NULL);
}
is_new_line_available = true;
is_new_chromosome_available = false;
is_eof = false;
pthread_cond_signal(&new_line_is_available);
pthread_mutex_unlock(&lock);
}
}
消费线程:
static void* consume(void* arg) {
for (;;) {
pthread_mutex_lock(&lock);
while (is_new_line_available) {
pthread_cond_wait(&new_line_is_available, &lock);
}
// ... process line of data to look for subdata type ...
if (EOF) {
is_eof = true;
pthread_cond_signal(&new_subdata_is_available);
pthread_mutex_unlock(&lock);
pthread_exit(NULL);
}
else if (subdata_found) {
is_new_subdata_available = true;
is_new_line_available = false;
pthread_cond_signal(&new_line_is_empty);
}
pthread_mutex_unlock(&lock);
}
}
然后是第三个"子数据"处理线程:
static void* consume_subdata_from_the_consumer(void* arg) {
for (;;) {
if (is_eof) {
pthread_exit(NULL);
}
pthread_mutex_lock(&lock);
while (!is_new_subdata_available) {
pthread_cond_wait(&new_subdata_is_available, &lock);
}
// ... process subdata ...
is_new_subdata_available = false;
is_new_line_available = true;
pthread_cond_signal(new_line_is_available);
pthread_mutex_unlock(&lock);
}
}
一些观察:
- 所有线程都应该有一个条件,使它们进入
pthread_exit()
,否则父进程将挂起。 - 有必要在锁定和解锁指令之间拉取所有修改状态的代码,否则无序处理的数据可能会损坏。
- 任何缓冲区溢出或写入初始化数据都可能导致问题。例如,使用
calloc()
在线程中使用字符缓冲区之前对其进行初始化。
相关文章:
- 消费者和生产者问题的双重缓冲
- 生产者使用者在 cpp 中使用互斥锁的问题
- 如何降低生产者获得锁的可能性,而消费者在使用std::condition_variable时无法获得锁?
- 用于免等待生产者和阻塞使用者的环形缓冲区
- 具有单个生产者单一使用者的无锁循环缓冲区
- C++生产者使用者中,同一使用者线程会抓取所有任务
- 如何将第二个使用者添加到基于 pthread 的生产者-使用者设置中?
- 生产者/使用者,如何确保在关闭所有使用者之前耗尽线程安全队列
- 交换缓冲区而不锁定单个生产者和多个使用者
- 经典生产者使用者线程
- 生产者-使用者日志文件输出重复
- 使用条件变量的生产者和使用者线程
- 生产者和使用者函数,用于在操作手册中测试C++并发的线程安全堆栈示例
- C++ - 生产者/使用者仅允许在定义的块中使用
- 使用 sleep() 时,生产者使用者(使用监视器)代码不起作用
- 使用提升线程和循环缓冲区挂起的生产者/使用者
- 为什么某些线程池实现不使用生产者和使用者模型
- Win32线程生产者更新使用者线程
- 如何在生产者-使用者方案中使用提升条件变量
- C++ 中的线程安全生产者/使用者模式