c++主线程通知线程通知主线程
C++ Main thread notifying threads notifying main thread
我需要帮助。我想做一些具体的事情,但我缺乏多线程技能,这让我很难受。
基本上我的主程序/线程需要管理一些必须运行多次的"通道"。由于这些运行是独立的,因此每个通道都包含一个执行它们的线程。
因此主线程必须等待所有通道(线程)完成它们的运行才能启动下一个通道。所有的通道都必须等待主线程通知它们可以运行。
下面是我怎么做的-抱歉它有点长!
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <atomic>
std::mutex g_lockprint;
std::mutex g_lockbatch;
std::condition_variable g_nextbatch;
std::mutex g_lockready;
std::condition_variable g_ready;
int global_id = 0;
int nbChannels = 5;
std::atomic<int> nbChannelsLeftToEnd;
class Channel {
private:
int _id;
std::thread _th;
std::atomic<bool> next_batch;
std::atomic<bool> stop_th;
public:
Channel() : _id(global_id++), _th(), next_batch(false), stop_th(false) {}
void go_for_next_batch() { next_batch = true; }
void start(int& start, int &end){
_th = std::thread(&Channel::run, this, std::ref(start), std::ref(end));
}
void stop(){
stop_th = true;
_th.join();
}
void run(int& start, int& end){
while (!stop_th){
{
std::unique_lock<std::mutex> locker(g_lockbatch);
g_nextbatch.wait(locker, [&](){return (next_batch==true); });
}
// print a starting message
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[channel " << _id << "]trunning in [" << start << "," << end << "]" << std::endl;
}
// simulate work
std::this_thread::sleep_for(std::chrono::seconds(1));
// update the number of channels left to run
nbChannelsLeftToEnd--;
g_ready.notify_one();
next_batch = false;
}
}
};
int main()
{
int end = 100;
int batch = 10;
int startBatch = 0;
int endBatch = startBatch + batch;
// declare some channels (threads)
std::vector<Channel> channels(nbChannels);
// start the threads
for (auto& ch : channels) ch.start(startBatch, endBatch);
while (endBatch<=end){
{
std::unique_lock<std::mutex> locker(g_lockprint);
std::cout << "[main]trunning in [" << startBatch << "," << endBatch << "]" << std::endl;
}
nbChannelsLeftToEnd = nbChannels;
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();
std::unique_lock<std::mutex> locker(g_lockready);
g_ready.wait(locker, [&](){return (nbChannelsLeftToEnd == 0); });
startBatch += batch;
endBatch += batch;
}
for (auto& ch : channels) ch.stop();
return 0;
}
但有时程序阻塞,可能线程等待彼此,但我不知道为什么。在任何情况下,连接线程("stop"方法在main的末尾)使我的程序无限期地运行,也看不出为什么。
EDIT:感谢您的评论和一些研究,我设法使用同步屏障获得一个工作程序,因此主线程可以等待所有其他线程完成当前批处理,然后告诉它们开始下一个。我重用了这里有人引用Anthony williams的书中的屏障代码——这里是屏障:
class barrier
{
unsigned const count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
public:
explicit barrier(unsigned count_) :
count(count_), spaces(count), generation(0) {}
void wait()
{
unsigned const my_generation = generation;
if (!--spaces)
{
spaces = count;
++generation;
}
else
{
while (generation == my_generation)
std::this_thread::yield();
}
}
};
下面是使用barrier的Channel类的新的run方法—注意在"stop_th"标志上的附加测试。当线程在上一个批处理之后和加入之前被解除阻塞时,它不应该运行另一个批处理,因此需要进行测试。
void run(int& start, int& end, barrier& b)
{
while (!stop_th){
// wait for next batch notification - use the next_batch flag to avoid
// spurious wake-ups
{
std::unique_lock<std::mutex> locker(g_lockbatch);
g_nextbatch.wait(locker, [&](){return (next_batch==true); });
}
if (stop_th) return;
// simulate work
std::this_thread::sleep_for(std::chrono::seconds(1));
// wait for everyone to meet
next_batch = false;
b.wait();
}
}
最后是main:
int main()
{
int end = 100;
int batch = 10;
int startBatch = 0;
int endBatch = startBatch + batch;
// declare a barrier where all threads will meet
barrier b(nbChannels+1);
// declare some channels (threads)
std::vector<Channel> channels(nbChannels);
// start the threads
for (auto& ch : channels) ch.start(startBatch, endBatch, b);
while (endBatch<=end){
// notify the channels they can process one batch
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();
// wait until all threads have finished their batch
b.wait();
// prepare the next one
startBatch += batch;
endBatch += batch;
}
// all channels are blocked by the next_batch condition
// so notify a next batch and join them
for (auto& ch : channels) ch.stop();
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();
for (auto& ch : channels) ch.wait_until_stopped();
return 0;
}
再次感谢你的评论/回答!! 我把我的评论变成了一个答案,因为我在cpp.sh中修复了代码,现在似乎已经完成了。
在调用stop时考虑它们不存在。注意,它们可能仍然在等待下一个批处理锁。考虑添加一个调用来将它们从锁中释放出来,并让它们检查它们是否在锁定步骤后停止。
将停止函数分成两个函数,一个用于更改布尔值,另一个用于等待。让我们调用两个函数stop和wait_until_stopped
然后将以下代码添加到main函数中。
代替
for (auto& ch : channels) ch.stop();
使用:
for (auto& ch : channels) ch.stop();
for (auto& ch : channels) ch.go_for_next_batch();
g_nextbatch.notify_all();
for (auto& ch : channels) ch.wait_until_stopped();
相关文章:
- 是否有必要获取锁并在不需要唤醒线程时通知condition_variable?
- C++ Poco - 如何向特定线程发送通知/消息?
- C++ - 如何不错过来自多个线程的多个通知?
- C++ - std::condition_variable 由不同的线程通知
- 线程启动延迟 - 通知所有未唤醒所有线程
- 使用 condition_variable::notify_all 通知多个线程
- 通知所有在多线程C++不起作用.导致死锁
- 通知线程是否始终需要在修改期间锁定共享数据
- 当子线程退出时,如何通知父线程
- QSocketNotifier:不能从另一个线程启用或禁用套接字通知程序
- 使用Berkeley套接字的线程通知
- 通知有关变量变化的线程(信号?)
- 用信号通知线程启动特定的函数
- 线程退出时通知Waiters
- C++11 std::condition_variable:我们可以将锁直接传递给通知的线程吗?
- 通知线程退出
- std::condition_variable - 等待多个线程通知观察者
- 线程之间的更改通知标志需要内存屏障吗
- 视觉提升 条件变量中的同步队列C++不通知其他线程上的等待类方法
- c++主线程通知线程通知主线程