实现线程间同步屏障的最佳方法是什么?

What is the best way to realize a synchronization barrier between threads

本文关键字:最佳 方法 是什么 线程 同步 实现      更新时间:2023-10-16

有几个线程在运行,我需要保证每个线程在继续之前都到达了某个点。我需要实现一种屏障。考虑一个可以在多个线程中运行的函数func:

void func()
{
  operation1();
  // wait till all threads reached this point 
  operation2();
}

使用c++ 11和VS12实现这个屏障的最好方法是什么,如果需要的话考虑boost。

可以使用boost::barrier
不幸的是,线程屏障概念本身并不是c++11或visual c++的一部分。在纯c++11中,可以使用条件变量和计数器。

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
class my_barrier
{
 public:
    my_barrier(int count)
     : thread_count(count)
     , counter(0)
     , waiting(0)
    {}
    void wait()
    {
        //fence mechanism
        std::unique_lock<std::mutex> lk(m);
        ++counter;
        ++waiting;
        cv.wait(lk, [&]{return counter >= thread_count;});
        cv.notify_one();
        --waiting;
        if(waiting == 0)
        {
           //reset barrier
           counter = 0;
        }
        lk.unlock();
    }
 private:
      std::mutex m;
      std::condition_variable cv;
      int counter;
      int waiting;
      int thread_count;
};
int thread_waiting = 3;
my_barrier barrier(3);

void func1()
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
    barrier.wait();
    std::cout << "I have awakened" << std::endl;
}
void func2()
{
    barrier.wait();
    std::cout << "He has awakened!!" << std::endl;
}
int main() {
    std::thread t1(func1);  
    std::thread t2(func2);
    std::thread t3(func2);
    t1.join();
    t2.join();
    t3.join();
}

每个线程等待直到一个谓词被满足。最后一个线程将使谓词有效,并允许等待的线程继续进行。如果你想重用障碍(例如多次调用函数),您需要另一个变量来重置计数器。

当前的实现是有限的。调用func();func();两次可能不会使线程等待第二次。

可以选择使用OpenMP框架。

#include <omp.h>
void func()
{
  #pragma omp parallel num_threads(number_of_threads)
  {
    operation1();
    #pragma omp barrier
    // wait till all threads reached this point 
    operation2();
  }
}

使用-fopenmp

编译代码

解决方案:

#include <cassert>
#include <condition_variable>
class Barrier
{
public:
    Barrier(std::size_t nb_threads)
        : m_mutex(),
        m_condition(),
        m_nb_threads(nb_threads)
    {
        assert(0u != m_nb_threads);
    }
    Barrier(const Barrier& barrier) = delete;
    Barrier(Barrier&& barrier) = delete;
    ~Barrier() noexcept
    {
        assert(0u == m_nb_threads);
    }
    Barrier& operator=(const Barrier& barrier) = delete;
    Barrier& operator=(Barrier&& barrier) = delete;
    void Wait()
    {
        std::unique_lock< std::mutex > lock(m_mutex);
        assert(0u != m_nb_threads);
        if (0u == --m_nb_threads)
        {
            m_condition.notify_all();
        }
        else
        {
            m_condition.wait(lock, [this]() { return 0u == m_nb_threads; });
        }
    }
private:
    std::mutex m_mutex;
    std::condition_variable m_condition;
    std::size_t m_nb_threads;
};

:

#include <chrono>
#include <iostream>
#include <thread>
Barrier barrier(2u);
void func1()
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
    barrier.Wait();
    std::cout << "t1 awakened" << std::endl;
}
void func2()
{
    barrier.Wait();
    std::cout << "t2 awakened" << std::endl;
}
int main()
{
    std::thread t1(func1);  
    std::thread t2(func2);
    t1.join();
    t2.join();
    return 0;
}

Try It Online: WandBox