线程池:在所有工作完成之前销毁块
Thread Pool: Block destruction until all work is done
我有以下线程池实现:
template<typename... event_args>
class thread_pool{
public:
using handler_type = std::function<void(event_args...)>;
thread_pool(handler_type&& handler, std::size_t N = 4, bool finish_before_exit = true) : _handler(std::forward<handler_type&&>(handler)),_workers(N),_running(true),_finish_work_before_exit(finish_before_exit)
{
for(auto&& worker: _workers)
{
//worker function
worker = std::thread([this]()
{
while (_running)
{
//wait for work
std::unique_lock<std::mutex> _lk{_wait_mutex};
_cv.wait(_lk, [this]{
return !_events.empty() || !_running;
});
//_lk unlocked
//check to see why we woke up
if (!_events.empty()) {//was it new work
std::unique_lock<std::mutex> _readlk(_queue_mutex);
auto data = _events.front();
_events.pop();
_readlk.unlock();
invoke(std::move(_handler), std::move(data));
_cv.notify_all();
}else if(!_running){//was it a signal to exit
break;
}
//or was it spurious and we should just ignore it
}
});
//end worker function
}
}
~thread_pool()
{
if(_finish_work_before_exit)
{//block destruction until all work is done
std::condition_variable _work_remains;
std::mutex _wr;
std::unique_lock<std::mutex> lk{_wr};
_work_remains.wait(lk,[this](){
return _events.empty();
});
}
_running=false;
//let all workers know to exit
_cv.notify_all();
//attempt to join all workers
for(auto&& _worker: _workers)
{
if(_worker.joinable())
{
_worker.join();
}
}
}
handler_type& handler()
{
return _handler;
}
void propagate(event_args&&... args)
{
//lock before push
std::unique_lock<std::mutex> _lk(_queue_mutex);
{
_events.emplace(std::make_tuple(args...));
}
_lk.unlock();//explicit unlock
_cv.notify_one();//let worker know that data is available
}
private:
bool _finish_work_before_exit;
handler_type _handler;
std::queue<std::tuple<event_args...>> _events;
std::vector<std::thread> _workers;
std::atomic_bool _running;
std::condition_variable _cv;
std::mutex _wait_mutex;
std::mutex _queue_mutex;
//helpers used to unpack tuple into function call
template<typename Func, typename Tuple, std::size_t... I>
auto invoke_(Func&& func, Tuple&& t, std::index_sequence<I...>)
{
return func(std::get<I>(std::forward<Tuple&&>(t))...);
}
template<typename Func, typename Tuple, typename Indicies = std::make_index_sequence<std::tuple_size<Tuple>::value>>
auto invoke(Func&& func, Tuple&& t)
{
return invoke_(std::forward<Func&&>(func), std::forward<Tuple&&>(t), Indicies());
}
};
我最近在析构函数中添加了这一节:
if(_finish_work_before_exit)
{//block destruction until all work is done
std::condition_variable _work_remains;
std::mutex _wr;
std::unique_lock<std::mutex> lk{_wr};
_work_remains.wait(lk,[this](){
return _events.empty();
});
}
目的是让析构函数阻塞,直到工作队列被完全消耗。
但是它似乎使程序陷入死锁。所有的工作都完成了,但是等待似乎并没有在工作完成后结束。
考虑这个例子main:
std::mutex writemtx;
thread_pool<int> pool{
[&](int i){
std::unique_lock<std::mutex> lk{writemtx};
std::cout<<i<<" : "<<std::this_thread::get_id()<<std::endl;
},
8//threads
};
for (int i=0; i<8192; ++i) {
pool.propagate(std::move(i));
}
如何让析构函数等待工作完成而不引起死锁?
你的代码是死锁的原因是_work_remains
是一个条件变量,它没有被你的代码的任何部分"通知"。您需要将其作为一个类属性,并由从_events
获取最后一个事件的任何线程通知。
相关文章:
- 工作线程在执行太快后永久休眠
- ZeroMQ 在使用 std::thread 创建工作线程时崩溃
- 工作线程一直在等待,condition_variable甚至调用了notify_all
- 使用 std::atomic 标志和 std::condition_variable 在工作线程上等待
- Qt-工作线程崩溃时将cv::Mat转换为QImage
- 无法将接口从主线程封送到工作线程
- 在 C++ 中扩展作业/工作线程多线程系统
- Qt C++ - 如何将数据从工作线程传递到主线程?
- 从线程池工作线程使用 GetQueuedCompletionStatus 的奇怪行为
- QtThread:I/O 队列的工作线程
- 将信号从工作线程类连接到控制器类 - QThreads
- Qt:工作线程和 GUI 事件之间的关系
- 将数据集几乎平均分配给工作线程
- 在为工作线程访问 lambda 中捕获的向量列表中的元素引用时,是否需要互斥锁?
- 如何将C++ dll 在 C# 窗口窗体应用程序下的工作线程中运行
- 在Qt中使用工作线程将数据写入文件的正确方法是什么?
- 在工作线程中使用 QT 主窗口
- C++,pthreads:如何从多个线程停止工作线程
- 同步主线程和工作线程
- 从工作线程更新QtCharts的正确方法