线程广播,然后等待
pthread broadcast and then wait?
我正在尝试设置几个线程处于等待状态,直到它们收到pthread_cond_broadcast()
。
在完成一个作业后,我希望线程返回到它们的等待状态。
我还希望调用pthread_cond_broadcast()
的进程在继续之前等待所有线程返回到它们的等待状态。在本例中,调用broadcast的是main()函数。我正试图做到这一点b有main(0)做pthread_cond_wait()
后调用广播。
void* Work::job(void* id)
{
int idx = (long)id;
while(1)
{
pthread_mutex_lock(&job_lock);
while(!jobs_complete)
{
// wait for main to broadcast
pthread_cond_wait(&can_work, &job_lock);
pthread_mutex_unlock(&job_lock);
// work here
pthread_mutex_lock(&job_lock);
++jobs_completed;
if(jobs_completed == NUM_THREADS)
{
jobs_complete = true;
pthread_cond_signal(&jobs_done);
pthread_mutex_unlock(&job_lock);
}
pthread_mutex_unlock(&job_lock);
}
pthread_mutex_unlock(&job_lock);
}
return NULL;
}
NUM_THREADS为4,job_lock为pthread_mutex_t
, can_work和jobs_done为pthread_cond_t
, jobs_completed为bool
, jobs_complete为int
。
// work
jobs_completed = false;
jobs_complete = 0;
pthread_mutex_lock(&job_lock);
pthread_cond_broadcast(&can_work);
pthread_cond_wait(&jobs_complete);
pthread_mutex_unlock(&job_lock);
// work that depends on jobs_complete
现在,我正在通过调用pthread_cond_broadcast()
,然后在它之后调用pthread_cond_wait()
来执行此操作,但这似乎会死锁。
谁能解释一下我应该怎么做,或者我哪里错了?我很感激你的帮助。
谢谢!
我只是发布这个(这几乎是所有的C代码,但pthreads也是,所以有点宽松是善意的请求)来演示一种方法做我认为你试图完成。很明显,你会想要在适当的类中适当地封装其中的大部分。这里希望向您展示的是条件变量、互斥锁以及它们与谓词管理和通知的关系是如何工作的。
我希望它对你有用。祝你今天愉快。#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
// our global condition variable and mutex
pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
// our predicate values.
bool finished = false;
int jobs_waiting = 0;
int jobs_completed = 0;
// our thread proc
static void *worker_proc(void* p)
{
intptr_t id = (intptr_t)p; // our id
size_t n_completed = 0; // our job completion count
// always latch prior to eval'ing predicate vars.
pthread_mutex_lock(&mtx);
while (!finished)
{
// wait for finish or work-waiting predicate
while (!finished && jobs_waiting == 0)
pthread_cond_wait(&cv, &mtx);
// we own the mutex, so we're free to look at, modify
// etc. the values(s) that we're using for our predicate
if (finished)
break;
// must be a job_waiting, reduce that number by one, then
// unlock the mutex and start our work. Note that we're
// changing the predicate (jobs_waiting is part of it) and
// we therefore need to let anyone that is monitoring know.
--jobs_waiting;
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&mtx);
// DO WORK HERE (this just runs a lame summation)
for (int i=0,x=0;i<1048576; x += ++i);
++n_completed;
// finished work latch mutex and setup changes
pthread_mutex_lock(&mtx);
++jobs_completed;
pthread_cond_broadcast(&cv);
}
// final report
cout << id << ": jobs completed = " << n_completed << endl;
// we always exit owning the mutex, so unlock it now. but
// let anyone else know they should be quitting as well.
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&mtx);
return p;
}
// sets up a batch of work and waits for it to finish.
void run_batch(int num)
{
pthread_mutex_lock(&mtx);
jobs_waiting = num;
jobs_completed = 0;
pthread_cond_broadcast(&cv);
// wait or all jobs to complete.
while (jobs_completed != num)
pthread_cond_wait(&cv, &mtx);
// we own this coming out, so let it go.
pthread_mutex_unlock(&mtx);
}
// main entry point.
int main()
{
// number of threads in our crew
static const size_t N = 7;
pthread_t thrds[N] = {0};
// startup thread crew.
intptr_t id = 0;
for (size_t i=0; i<N; ++i)
pthread_create(thrds + i, NULL, worker_proc, (void*)(++id));
// run through batches. each batch is one larger
// than the prior batch. this should result in some
// interesting job-counts per-thread.
for (int i=0; i<64; ++i)
run_batch(i);
// flag for shutdown state.
pthread_mutex_lock(&mtx);
finished = true;
pthread_cond_broadcast(&cv);
pthread_mutex_unlock(&mtx);
for (size_t i=0; i<N; pthread_join(thrds[i++], NULL));
return 0;
}
示例输出#1
3: jobs completed = 256
6: jobs completed = 282
5: jobs completed = 292
2: jobs completed = 242
1: jobs completed = 339
4: jobs completed = 260
7: jobs completed = 409
示例输出#2
6: jobs completed = 882
1: jobs completed = 210
4: jobs completed = 179
5: jobs completed = 178
2: jobs completed = 187
7: jobs completed = 186
3: jobs completed = 194
示例输出#3
1: jobs completed = 268
6: jobs completed = 559
3: jobs completed = 279
5: jobs completed = 270
2: jobs completed = 164
4: jobs completed = 317
7: jobs completed = 159
固定批量大小
相同的代码,但改变了这个:
for (int i=0; i<64; ++i)
run_batch(i);
:
for (int i=0; i<64; ++i)
run_batch(N);
给出了以下内容,这可能更接近您真正想要的内容:
示例输出#1
4: jobs completed = 65
2: jobs completed = 63
5: jobs completed = 66
3: jobs completed = 63
1: jobs completed = 64
7: jobs completed = 63
6: jobs completed = 64
示例输出#2
3: jobs completed = 65
5: jobs completed = 62
1: jobs completed = 67
7: jobs completed = 63
2: jobs completed = 65
6: jobs completed = 61
4: jobs completed = 65
示例输出#3
2: jobs completed = 58
4: jobs completed = 61
5: jobs completed = 69
7: jobs completed = 68
3: jobs completed = 61
1: jobs completed = 64
6: jobs completed = 67
在函数末尾有3个可能的连续调用pthread_mutex_unlock
,这将导致未定义的行为。实际上你不需要这两个内层。如果jobs_complete
是true
,线程将退出循环并释放锁,否则它将循环并需要等待can_work
条件。
也,
pthread_cond_wait(&jobs_complete);
你的意思可能是:
pthread_cond_wait(&jobs_complete,&job_lock);
此外,该函数需要pthread_cond_t *
和pthread_mutex_t *
,而不是int
,所以即使这样,代码也明显被破坏了。
请注意,条件变量上的信号或广播只会对已经等待该变量的线程产生影响。该信号不被保留以供将来等待。因此,当线程在jobs_complete
上循环while阻塞并再次等待时,它们将不得不再次发出信号以恢复工作。
另一件事:你提到job_complete
的类型是int
, job_completed
的类型是bool
,但你的代码似乎不一致:
if(jobs_completed == NUM_THREADS)
{
jobs_complete = true;
我的建议是:学习信号量和屏障抽象模型,如果可以的话,使用现有的实现(c++ 11中的boost
或std
),或者使用pthread
API重新实现它们。这些将帮助您比操作其他变量更容易地处理这种情况。在这个网站上查找现有的解决方案。例如,这个问题处理的是一个非常相似的问题,我提供的解决方案可以很容易地修改为使用pthread API来满足您的需求。
- lambda参数转换为constexpr技巧,然后获取带链接的数组
- 为什么我的C#代码在调用回C++COM直到Task时会暂停.等待/线程.加入
- 如何声明特征矩阵,然后通过嵌套循环初始化它
- 这是我尝试让用户将值输入到数组中.然后将其隐藏为大量的星号
- boost::asio如何生成多个协同程序,然后加入它们
- 如何让LLDB在成功时退出,在失败时等待
- 如何将图像传输到c++(dll)中的缓冲区,然后在c#的缓冲区中读/写
- 等待整个 omp 块完成,然后再调用第二个函数
- 在CMD中运行MATLAB代码,然后等待完成
- 等待所有线程完成一项工作,然后执行另一项工作
- boost::asio::ip::tcp::iostream,先启动客户端,然后等待服务器
- 将光标更改为沙漏/等待/忙碌光标,然后返回Qt
- 等待buttonPressed()插槽完成,然后执行buttonRelease()
- 等待另一个进程锁定然后解锁 Win32 互斥锁
- 打开另一个表单,然后等待该表单返回响应
- 在Linux上,从C++调用一个可执行文件,然后等待它完成
- 我如何打开/等待一个程序在c++中完成,然后运行另一个
- Qt websocket发送消息并等待响应,然后再进入下一个方法
- 线程广播,然后等待
- 如何告诉QThread等待,直到工作完成,然后结束