虽然主线程中的循环在使用 std::thread 时卡住

While loop in main thread is getting stuck when using std::thread

本文关键字:std thread 线程 循环      更新时间:2023-10-16

我有一个简单的C++代码来测试和理解线程。代码具有主线程 + 辅助线程。 辅助更新主线程循环所依赖的变量的值。当我在主循环中添加一个 print 语句时,程序成功完成,但是当我删除这个 print 语句时,它会进入无限循环。 这是我正在使用的代码,我所指的 print 语句是 print 语句 2

#include <mpi.h>
#include <iostream>
#include <fstream>
#include <thread>
#include <mutex>
std::mutex mu;
int num;
using namespace std;
void WorkerFunction()
{
bool work = true;
while(work)
{
mu.lock();
num --;
mu.unlock();
if(num == 1)
work = false;
}
}

int main(int argc, char **argv)
{
bool work = true;
num = 10;
int numRanks, myRank, provided;
MPI_Init_thread(&argc, &argv, MPI_THREAD_FUNNELED, &provided);
MPI_Comm_size(MPI_COMM_WORLD, &numRanks);
MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
std::thread workThread (WorkerFunction);
//print statement 1
cerr<<"Rank "<<myRank<<" Started workThread n";
int mult = 0;
while(work)
{
mult += mult * num;
//print statement 2
if(myRank == 0) cerr<<"num = "<<num<<"n";
if(num == 1)
work = false;
}
if(work == false)
workThread.join();
//print statement 3
cerr<<"Rank "<<myRank<<" Done with both threads n";
MPI_Finalize();
};

这是我在打印语句 2 时得到的输出

mpirun -np 4 ./Testing
Rank 0 Started workThread 
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
num = 10
Rank 1 Started workThread 
Rank 0 Done with both threads 
Rank 1 Done with both threads 
Rank 2 Started workThread 
Rank 3 Started workThread 
Rank 2 Done with both threads 
Rank 3 Done with both threads

如果我注释掉该 print 语句,那么它会进入无限循环,这就是我得到的输出

mpirun -np 4 ./Testing
Rank 0 Started workThread 
Rank 0 Done with both threads 
Rank 1 Started workThread 
Rank 2 Started workThread 
Rank 3 Started workThread 
Rank 2 Done with both threads 
Rank 3 Done with both threads

我不确定我做错了什么,任何帮助都是不胜感激的。

关于MPI,我没有任何经验。(我几十年前就用过它,我确信这个事实完全没有价值。然而,OP声称

我有一个简单的C++代码来测试和理解线程。

考虑到多处理(带MPI)和多线程(带std::thread)本身就是复杂的主题,我会先将主题分开,并在获得一些经验后尝试将它们放在一起。

因此,我详细说明了多线程(我觉得可以)。


第一个示例是 OPs 代码的修订版本(删除MPI的所有引用):

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex mtxNum;
int num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
mtxNum.lock();
num_ = --num;
mtxNum.unlock();
work = num_ != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
mtxNum.lock();
num_ = num;
mtxNum.unlock();
std::cout << "num: " << num_ << 'n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.n";
}

输出:

num: 10
num: 8
num: 7
num: 6
num: 5
num: 4
num: 3
num: 2
num: 2
num: 1
Both threads done.

科里鲁的现场演示

笔记:

  1. 当多线程正在运行,并且共享变量num,并且在至少一个线程中修改变量num时,每个访问都应放入关键部分(一对互斥锁和解锁)。

  2. 关键部分应始终保持尽可能短。(一次只能有一个线程通过关键部分。因此,它引入了重新序列化,这会消耗并发预期的加速。我在每个线程中引入了一个局部变量num_,以复制共享变量的当前值,并在相应线程的关键部分之后使用它。*

  3. 我为两个线程添加了一个sleep_for()以获得更好的说明。没有,我得到了

    num: 10
    num: 1
    Both threads done.
    

    我觉得有点无聊。

  4. 输出跳过num == 9并打印num == 2两次。(这在其他运行中可能看起来有所不同。原因是线程根据定义异步工作。(两个线程中 100 毫秒的相等延迟不是可靠的同步。如果没有任何东西(例如锁定的互斥锁)阻止这种情况,操作系统负责唤醒线程。可以随时自由挂起线程。

关于mtxNum.lock()/mtxNum.unlock():想象一下,关键部分包含比可能引发异常的简单--num;更复杂的内容。如果引发异常,则会跳过mtxNum.unlock(),并生成死锁,阻止任何线程继续。

为此,std库提供了一个很好且方便的工具:std::lock_guard

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex mtxNum;
int num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ std::lock_guard<std::mutex> lock(mtxNum); // does the mtxNum.lock()
num_ = --num;
} // destructor of lock does the mtxNum.unlock()
work = num_ != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ std::lock_guard<std::mutex> lock(mtxNum); // does the mtxNum.lock()
num_ = num;
} // destructor of lock does the mtxNum.unlock()
std::cout << "num: " << num_ << 'n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.n";
}

输出:

num: 10
num: 8
num: 7
num: 6
num: 5
num: 4
num: 3
num: 2
num: 1
Both threads done.

科里鲁的现场演示

std::lock_guard的诀窍是析构函数在任何情况下都会解锁互斥锁,即使在关键部分内抛出异常也是如此。

可能是,我有点偏执,但令我恼火的是,对共享变量的非保护访问可能会意外发生,而不会在任何调试会话或任何编译器诊断中被注意到。**因此,可能值得将共享变量隐藏到一个类中,在该类中,只有锁定它才能访问。为此,我在示例中介绍了Shared

#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
template <typename T>
class Shared {
public:
struct Lock {
Shared &shared;
std::lock_guard<std::mutex> lock;
Lock(Shared &shared): shared(shared), lock(shared._mtx) { }
~Lock() = default;
Lock(const Lock&) = delete;
Lock& operator=(const Lock&) = delete;
const T& get() const { return shared._value; }
T& get() { return shared._value; }
};
private:
std::mutex _mtx;
T _value;
public:
Shared() = default;
explicit Shared(T &&value): _value(std::move(value)) { }
~Shared() = default;
Shared(const Shared&) = delete;
Shared& operator=(const Shared&) = delete;
};
typedef Shared<int> SharedInt;
SharedInt shNum(10);
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ SharedInt::Lock lock(shNum);
num_ = --lock.get();
}
work = num_ != 1;
}
}
int main()
{
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
int num_;
{ const SharedInt::Lock lock(shNum);
num_ = lock.get();
}
std::cout << "num: " << num_ << 'n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.n";
}

输出:与以前相似。

科里鲁的现场演示

诀窍在于,可以从Shared::Lock实例中检索对共享值的引用,→即在它被锁定时。即使存储了引用:

{ SharedInt::Lock lock(shNum);
int &num = lock.get();
num_ = --num;
}

int &num的生命周期在SharedInt::Lock lock(shNum);的生命周期之前结束。

当然,人们可以获得指向num的指针以在范围之外使用它,但我认为这是破坏。


我想提到的另一件事是std::atomic

原子

库提供用于细粒度原子操作的组件,允许无锁并发编程。对于涉及同一对象的任何其他原子操作,每个原子操作都是不可分割的。

虽然互斥锁可能是操作系统内核功能的主题,但原子访问可能是利用 CPU 功能完成的,而无需进入内核。(这可能会加快速度,并减少操作系统资源的使用。

更好的是,如果没有对可用类型的硬件支持,它会回退到基于互斥锁或其他锁定操作的实现(根据std::atomic<T>::is_lock_free()中的注释):

除 std::atomic_flag 之外的所有原子类型都可以使用互斥体或其他锁定操作来实现,而不是使用无锁原子 CPU 指令。原子类型有时也允许无锁,例如,如果在给定的架构上只有对齐的内存访问自然是原子的,则相同类型的未对齐对象必须使用锁。

修改后的样本具有std::atomic

#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>
std::atomic<int> num;
const std::chrono::milliseconds delay(100);
void WorkerFunction()
{
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
work = --num != 1;
}
}
int main()
{
num = 10;
std::thread workThread(&WorkerFunction);
int mult = 0;
for (bool work = true; work; std::this_thread::sleep_for(delay)) {
const int num_ = num;
std::cout << "num: " << num_ << 'n';
mult += mult * num_;
work = num_ != 1;
}
if (workThread.joinable()) workThread.join();
std::cout << "Both threads done.n";
}

输出:

num: 10
num: 8
num: 7
num: 7
num: 5
num: 4
num: 3
num: 3
num: 1
Both threads done.

科里鲁的现场演示


*我在WorkingThread()上沉思了一会儿。如果它是唯一修改num的线程,那么对关键部分之外的num(WorkingThread())的读取访问应该是安全的——我相信。但是,至少,为了可维护性,我不会这样做。

**根据我的个人经验,此类错误很少(或从不)在调试会话中发生,而是在向客户演示的前 180 秒发生。