Fork Join并行阻塞

C++ Fork Join Parallelism Blocking

本文关键字:并行 Join Fork      更新时间:2023-10-16

假设您希望并行运行一个section,然后合并回主线程,然后并行返回到section,依此类推。类似小时候的游戏红灯绿灯。

我已经给出了一个我正在尝试做的例子,我在开始时使用条件变量来阻塞线程,但希望并行地启动它们,但在结束时阻塞它们,以便它们可以串行打印出来。*=操作可能是一个大得多的操作,持续数秒。重用线程也很重要。使用任务队列可能会太繁重。

我需要使用某种阻塞结构,而不仅仅是一个普通的忙循环,因为我知道如何使用忙循环来解决这个问题。

英语

:

  1. 线程1创建10个被阻塞的线程
  2. 线程1向所有线程发出启动信号(不相互阻塞)
  3. 线程2-11处理独占内存
  4. 线程1正在等待直到2-11完成(可以在这里使用原子计数)
  5. 线程2-11完成,如果需要,每个线程可以通知1来检查其状态
  6. 线程1检查其条件并打印数组
  7. 线程1请求2-11重新处理,从2
  8. 继续

示例代码(改编自cplusplus.com上的示例):

// condition_variable example
#include <iostream>           // std::cout
#include <thread>             // std::thread
#include <mutex>              // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable
#include <atomic>
std::mutex mtx;
std::condition_variable cv;
bool ready = false;
std::atomic<int> count(0);
bool end = false;
int a[10];
void doublea (int id) {
  while(!end) {
    std::unique_lock<std::mutex> lck(mtx);
    while (!ready) cv.wait(lck);
    a[id] *= 2;
    count.fetch_add(1);
  }
}
void go() {
  std::unique_lock<std::mutex> lck(mtx);
  ready = true;
  cv.notify_all();
  ready = false; // Naive
  while (count.load() < 10) sleep(1);
  for(int i = 0; i < 10; i++) {
    std::cout << a[i] << std::endl;
  }
  ready = true;
  cv.notify_all();
  ready = false;
  while (count.load() < 10) sleep(1);
  for(int i = 0; i < 10; i++) {
    std::cout << a[i] << std::endl;
  }
  end = true;
  cv.notify_all();
}
int main () {
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i) {
    a[i] = 0;
    threads[i] = std::thread(doublea,i);
  }
  std::cout << "10 threads ready to race...n";
  go();                       // go!
  return 0;
}

要有效地实现它并不容易。此外,除非你正在学习这门学科,否则它没有任何意义。条件变量在这里不是一个好的选择,因为它不能很好地扩展。

我建议你看看成熟的运行库是如何实现fork-join并行的,并向它们学习或在你的应用程序中使用它们。参见http://www.openmprtl.org/, http://opentbb.org/, https://www.cilkplus.org/-所有这些都是开源的。

OpenMP是您正在寻找的最接近的模型,它具有最有效的fork-join屏障实现。但是,它也有缺点,因为它是为HPC设计的,缺乏动态可组合性。TBB和Cilk最适合嵌套并行,并在模块和库中使用,这些模块和库可以在外部并行区域的上下文中使用。

可以使用barrier或condition变量启动所有线程。然后线程1可以等待所有线程结束它们的工作(通过所有线程的join方法,它是阻塞的),然后在一个for循环中打印它们的数据。