如何同步运行在不同线程上的函数实例(在c++11中)

How to synchronize instances of a function running on different threads (in c++11)?

本文关键字:函数 实例 c++11 线程 何同步 同步 运行      更新时间:2023-10-16

假设有许多线程组成一个循环,运行同一个函数的实例,但是每次迭代的开始需要同步(因此首先完成的线程必须等待最后一个线程开始新的迭代)。在c++11中如何做到这一点?

这篇文章的其余部分只是我尝试过的,以及它是如何失败的。

我使用一个计数器,"sync",最初设置为3(线程数)。在函数结束时,每个线程都会从这个计数器中减去1,并开始等待。当计数器达到0时,这意味着它们中的3个已经完成了一轮,因此主线程将计数器重置为3并通知线程唤醒它们。

这在大多数情况下都有效,但有时会有一两个线程无法唤醒。

这些是全局变量:

mutex syncMutex;
condition_variable syncCV;
int sync;

这是在线程中循环运行的函数的末尾:

unique_lock<mutex> lk(syncMutex);
cout << "Thread num: " << mFieldNum << " got sync value: " << sync;
sync --;
syncCV.notify_all();
cout << " and goes to sleep..." << endl;
syncCV.wait(lk, []{return sync == numFields;});
cout << "Thread num: " << mFieldNum << " woke up" << endl;
}

这在主线程的循环中运行:

unique_lock<mutex> lk(syncMutex);
syncCV.wait(lk, []{return sync == 0;});
sync = 3;
lk.unlock();
cout << "Notifying all threads!" << endl;
syncCV.notify_all();

这是它失败时产生的输出(线程#3没有唤醒):

Thread num: 1 got sync value: 3 and goes to sleep...
Thread num: 2 got sync value: 2 and goes to sleep...
Thread num: 3 got sync value: 1 and goes to sleep...
Notifying all threads!
Thread num: 1 woke up
Thread num: 2 woke up
Thread num: 3 woke up
Thread num: 2 got sync value: 3 and goes to sleep...
Thread num: 1 got sync value: 2 and goes to sleep...
Thread num: 3 got sync value: 1 and goes to sleep...
Notifying all threads!
Thread num: 2 woke up
Thread num: 1 woke up
Thread num: 2 got sync value: 3 and goes to sleep...
Thread num: 1 got sync value: 2 and goes to sleep...

有人知道吗?谢谢你的阅读。

您的线程同步存在许多问题。托尼在他的评论中提到了一个。在主循环代码中还有一个潜在的竞争条件,在调用syncCV.notify_all()之前调用like .unlock()。(这可能会导致线程错过notify_all信号。)

我会用两种方式调整你的代码。首先,要解决使用"sync == numFields"作为条件的问题,正如Tony所指出的,在另一个线程执行sync——之后,该条件可能不成立,因此使用每个线程在每个主线程循环中只运行一次作为条件是有意义的。在我的示例代码中,这是通过引入"done[numFields]"变量实现的。其次,引入两个条件变量是有意义的——一个向工作线程发出信号,表明新的主循环迭代已经开始,另一个向主线程发出信号,表明工作线程已经完成。(注意两个条件变量使用同一个互斥锁。)

下面是一个完整的程序,以您的示例代码为模型,合并了这两种方法:

#include <iostream>
using std::cout;
using std::endl;
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
std::mutex syncMutex;
std::condition_variable readyCV;
std::condition_variable doneCV;
int sync;
bool exitFlag;
const int numFields = 5;
bool done[numFields];
const int nloops = 10;
void thread_func(int i) {
  int mFieldNum = i;
  while (true) {
    std::unique_lock<std::mutex> lk(syncMutex);
    readyCV.wait(lk, [mFieldNum]{return  exitFlag || !done[mFieldNum-1];});
    if (exitFlag)  break;
    cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << sync;
    if (--sync == 0)  doneCV.notify_all();
    done[mFieldNum-1] = true;
    readyCV.notify_all();
    cout << " and goes to sleep..." << endl;
  }
}
int main (int argc, char* argv[]) {
  exitFlag = false;
  sync = 0;
  std::vector<std::thread> threads;
  for (int i = 0; i < numFields; i++) {
    done[i] = true;
    threads.emplace_back (thread_func, i+1);
  }
  for (int i = 0; i <= nloops; i++) {
    std::unique_lock<std::mutex> lk(syncMutex);
    doneCV.wait(lk, []{return sync == 0;});
    cout << "main loop (lk held), i = " << i << endl;
    sync = numFields;
    if (i == nloops)  exitFlag = true;
    else              for (auto &b : done)  b = false;
    cout << "Notifying all threads!" << endl;
    readyCV.notify_all();
  }
  for (auto& t : threads)  t.join();
}

(我还添加了一个exitFlag和std::thread::join(),以便程序可以很好地清理和终止)

这与经典的生产者-消费者实现(一个生产者,numFields消费者)非常相似,只是增加了一个约束,即每个消费者线程在每个生产者线程循环中只运行一次。

如果您愿意放弃重用工作线程,您还可以更简单地实现本质上相同的程序逻辑。(在您的示例代码和我上面的示例中,它们充当一种专门的线程池。)在下一个示例中,将为每次主循环的迭代创建新的线程。这使得线程同步更简单,并且消除了条件变量。
#include <iostream>
using std::cout;
using std::endl;
#include <atomic>
#include <mutex>
#include <thread>
#include <vector>
std::mutex coutMutex;
std::atomic<int> sync;
const int numFields = 5;
bool done[numFields];
const int nloops = 10;
void thread_func(int i) {
  int mFieldNum = i;
  int mySync = sync--;
  {
    std::lock_guard<std::mutex> lk(coutMutex);
    cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << mySync << endl;
  }
}  
int main (int argc, char* argv[]) {
  for (int i = 0; i < nloops; i++) {
    cout << "main loop, i = " << i << endl;
    std::vector<std::thread> threads;
    sync = numFields;
    for (int i = 0; i < numFields; i++)  threads.emplace_back (thread_func, i+1);
    for (auto& t : threads)  t.join();
  }
}

(coutMutex是一个很好的方法,这样控制台输出就不会乱码,但对于核心同步逻辑来说不是必需的。)

如果在你的实际用例中,你不需要thread_func在迭代中保持活跃(例如,保留一些状态),并且如果每次调用thread_func都做了足够的工作,创建一个新线程来运行它的成本相比之下并不重要,那么为每个主循环迭代创建新线程(而不是重用线程)是直接、明智和更简单的。

多线程破解快乐!

K。弗兰克。