Fork Join并行阻塞
C++ Fork Join Parallelism Blocking
假设您希望并行运行一个section,然后合并回主线程,然后并行返回到section,依此类推。类似小时候的游戏红灯绿灯。
我已经给出了一个我正在尝试做的例子,我在开始时使用条件变量来阻塞线程,但希望并行地启动它们,但在结束时阻塞它们,以便它们可以串行打印出来。*=操作可能是一个大得多的操作,持续数秒。重用线程也很重要。使用任务队列可能会太繁重。
我需要使用某种阻塞结构,而不仅仅是一个普通的忙循环,因为我知道如何使用忙循环来解决这个问题。
英语:
- 线程1创建10个被阻塞的线程
- 线程1向所有线程发出启动信号(不相互阻塞)
- 线程2-11处理独占内存
- 线程1正在等待直到2-11完成(可以在这里使用原子计数)
- 线程2-11完成,如果需要,每个线程可以通知1来检查其状态
- 线程1检查其条件并打印数组
- 线程1请求2-11重新处理,从2 继续
示例代码(改编自cplusplus.com上的示例):
// condition_variable example
#include <iostream> // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
#include <atomic>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
std::atomic<int> count(0);
bool end = false;
int a[10];
void doublea (int id) {
while(!end) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready) cv.wait(lck);
a[id] *= 2;
count.fetch_add(1);
}
}
void go() {
std::unique_lock<std::mutex> lck(mtx);
ready = true;
cv.notify_all();
ready = false; // Naive
while (count.load() < 10) sleep(1);
for(int i = 0; i < 10; i++) {
std::cout << a[i] << std::endl;
}
ready = true;
cv.notify_all();
ready = false;
while (count.load() < 10) sleep(1);
for(int i = 0; i < 10; i++) {
std::cout << a[i] << std::endl;
}
end = true;
cv.notify_all();
}
int main () {
std::thread threads[10];
// spawn 10 threads:
for (int i=0; i<10; ++i) {
a[i] = 0;
threads[i] = std::thread(doublea,i);
}
std::cout << "10 threads ready to race...n";
go(); // go!
return 0;
}
要有效地实现它并不容易。此外,除非你正在学习这门学科,否则它没有任何意义。条件变量在这里不是一个好的选择,因为它不能很好地扩展。
我建议你看看成熟的运行库是如何实现fork-join并行的,并向它们学习或在你的应用程序中使用它们。参见http://www.openmprtl.org/, http://opentbb.org/, https://www.cilkplus.org/-所有这些都是开源的。
OpenMP是您正在寻找的最接近的模型,它具有最有效的fork-join屏障实现。但是,它也有缺点,因为它是为HPC设计的,缺乏动态可组合性。TBB和Cilk最适合嵌套并行,并在模块和库中使用,这些模块和库可以在外部并行区域的上下文中使用。
可以使用barrier或condition变量启动所有线程。然后线程1可以等待所有线程结束它们的工作(通过所有线程的join方法,它是阻塞的),然后在一个for循环中打印它们的数据。
相关文章:
- C++17中的并行执行策略
- 并行用于C++17中数组索引范围内的循环
- 在std::thread中,joinable()然后join()线程安全吗
- 如何在Elixir中调用递归函数并行
- OpenMP:并行更新数组总是需要减少数组吗
- 如何使用OpenMP并行这两个循环
- 如何使用OpenMP并行化此矩阵时间矢量运算
- 如何使用OpenMP使这个循环并行
- 遍历并行数组以确定C++中的最大数字
- 为什么 openmp 的并行不适用于矢量化色彩空间转换?
- 如何在 Mac 上使用 c++17 并行标准库算法?
- 并行标准::复制复杂性
- 如何使用 MPI 的远程内存访问 (RMA) 功能并行化数据聚合?
- 在C++中使用并行化的预期速度是多少(不是 OpenMp,而是 <thread>)
- 如何在 C++17 STL 并行算法中处理调度?
- join() 失败,如果在线程内部调用 io_context.run()
- OpenMP 与有序和关键指令并行
- OpenMP for 循环并行性问题
- 使用 vector<thread> 和 .join() 未并行运行的多线程C++程序
- Fork Join并行阻塞