是否将std::packaged_task添加到现有线程
Add a std::packaged_task to an existing thread?
是否有标准的方法将std::packaged_task
添加到现有线程?在运行任务之前,必须发生大量的开销,所以我想做一次,然后保持线程运行并等待任务执行。我希望能够使用futures,这样我就可以有选择地获得任务的结果并捕获异常。
我的C++11之前的实现要求我的任务从一个抽象基类继承,该基类具有Run()
方法(有点麻烦,不能使用lambdas(,并且有一个std::deque
集合,我将这些集合添加到主线程中,并从工作线程中出列。我必须保护该集合不被同时访问,并向工作线程提供一个信号,表明有什么事情要做,这样它就不会旋转或休眠。Enqueing返回一个"结果"对象和一个等待任务完成的同步对象,以及一个结果值。这一切都很好,但如果有更好的东西,是时候升级了。
这里有一个玩具线程池:
template<class T>
struct threaded_queue {
using lock = std::unique_lock<std::mutex>;
void push_back( T t ) {
{
lock l(m);
data.push_back(std::move(t));
}
cv.notify_one();
}
boost::optional<T> pop_front() {
lock l(m);
cv.wait(l, [this]{ return abort || !data.empty(); } );
if (abort) return {};
auto r = std::move(data.back());
data.pop_back();
return std::move(r);
}
void terminate() {
{
lock l(m);
abort = true;
data.clear();
}
cv.notify_all();
}
~threaded_queue()
{
terminate();
}
private:
std::mutex m;
std::deque<T> data;
std::condition_variable cv;
bool abort = false;
};
struct thread_pool {
thread_pool( std::size_t n = 1 ) { start_thread(n); }
thread_pool( thread_pool&& ) = delete;
thread_pool& operator=( thread_pool&& ) = delete;
~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
template<class F, class R=std::result_of_t<F&()>>
std::future<R> queue_task( F task ) {
std::packaged_task<R()> p(std::move(task));
auto r = p.get_future();
tasks.push_back( std::move(p) );
return r;
}
template<class F, class R=std::result_of_t<F&()>>
std::future<R> run_task( F task ) {
if (threads_active() >= total_threads()) {
start_thread();
}
return queue_task( std::move(task) );
}
void terminate() {
tasks.terminate();
}
std::size_t threads_active() const {
return active;
}
std::size_t total_threads() const {
return threads.size();
}
void clear_threads() {
terminate();
threads.clear();
}
void start_thread( std::size_t n = 1 ) {
while(n-->0) {
threads.push_back(
std::async( std::launch::async,
[this]{
while(auto task = tasks.pop_front()) {
++active;
try{
(*task)();
} catch(...) {
--active;
throw;
}
--active;
}
}
)
);
}
}
private:
std::vector<std::future<void>> threads;
threaded_queue<std::packaged_task<void()>> tasks;
std::atomic<std::size_t> active;
};
复制自我的另一个答案。
一个有1个线程的thread_pool
与您的描述非常匹配。
上面只是一个玩具,一个真正的线程池,我会用move_only_function<void()>
代替std::packaged_task<void()>
,这就是我使用它的全部目的。(packaged_task<void()>
可以有趣地保持packaged_task<R()>
,如果没有效率的话(。
你将不得不对关机进行推理并制定计划。如果您试图在不首先清除线程的情况下关闭上面的代码,那么它就会锁定。
相关文章:
- 如何将元素添加到数组的线程安全函数?
- 学习多线程C++:添加线程不会使执行速度更快,即使它看起来应该
- 将项目添加到队列时运行线程
- 为什么添加延迟会提高此多线程环境中的数据吞吐量?
- 如何在Qt中合并/追加/添加两个用于线程的模型?
- 将元素添加到 std::list 在多线程中,无需 C++ 互斥锁
- 在多线程中添加到向量
- 将类Cotaining std ::线程添加到向量
- 如何将元素添加到从另一个线程绑定到 XAML 的 IVector
- 将元素从C 11线程安全地添加到向量中
- 在qabstractItemmodel :: data()const中,如何添加一些线程安全数据
- 以后是否可以将线程添加到 omp 中以进行循环
- 为什么添加函数在C 11线程中无效
- C++:尝试将新节点添加到链表会产生"线程 1:EXC_BAD_ACCESS(代码 = 1,地址 = 0x0)"错误
- 如何仅在使用 c++ 中的 boost 完成所有任务时才向线程池添加新任务
- 为什么队列的所有元素都是一样的,元素被添加到 posix 线程中的队列中
- C++ 用于添加操作的线程安全
- C++:如何在循环中添加线程但不"pause"循环?
- 多个线程同时在unordered_map中添加值会使它崩溃
- Qt从其他线程向事件循环添加函数调用