带线程超时的Boost ASIO线程池
Boost ASIO threadpool with a thread timeout
我使用Boost ASIO库作为线程池,它被广泛描述。然而,我想中断每个线程,如果线程进程超过1秒,并移动到下一个线程发布的任务。
我可以很容易地实现这一点,使用一个单独的deadline_timer,如果线程在截止日期之前完成或中断线程,任务应该持续太长时间重置。然而,我认为这将内置到ASIO。因为它似乎很自然地有一个任务,与网络操作的超时。但我在API中找不到任何东西,简单地说,
谁能告诉我这个功能是否已经存在?或者我应该按照我描述的方式实现它?
这是我想出的一个快速解决方案。
它要求你提交的函数对象接受一个类型为exec_context
的参数。
在io_service中运行的任务可以查询.canceled()
访问器(这是原子的),以确定是否应该提前取消。
然后,它可以抛出异常或返回任何它想返回的值。
主叫通过submit
函数提交。该函数用context对象包装worker函数,并将其返回值和/或异常封送到std::future中。
调用者可以根据需要查询或等待这个未来(或忽略它)。
调用者获得一个句柄对象,该对象具有cancel()
方法。使用此句柄,调用者可以取消、查询或等待提交的任务。
希望有帮助。写起来很有趣。
#include <boost/asio.hpp>
#include <iostream>
#include <atomic>
#include <thread>
#include <chrono>
#include <future>
#include <stdexcept>
#include <exception>
#include <utility>
#include <type_traits>
//
// an object to allow the caller to communicate a cancellation request to the
// submitted task
//
struct exec_controller
{
/// @returns previous cancellation request state;
bool notify_cancel()
{
return _should_cancel.exchange(true);
}
bool should_cancel() const {
return _should_cancel;
}
private:
std::atomic<bool> _should_cancel = { false };
};
template<class Ret>
struct exec_state : std::enable_shared_from_this<exec_state<Ret>>
{
using return_type = Ret;
bool notify_cancel() {
return _controller.notify_cancel();
}
std::shared_ptr<exec_controller>
get_controller_ptr() {
return std::shared_ptr<exec_controller>(this->shared_from_this(),
std::addressof(_controller));
}
std::promise<return_type>& promise() { return _promise; }
private:
std::promise<return_type> _promise;
exec_controller _controller;
};
struct applyer;
struct exec_context
{
exec_context(std::shared_ptr<exec_controller> impl)
: _impl(impl)
{}
bool canceled() const {
return _impl->should_cancel();
}
private:
friend applyer;
std::shared_ptr<exec_controller> _impl;
};
struct applyer
{
template<class F, class Ret>
void operator()(F& f, std::shared_ptr<exec_state<Ret>> const& p) const
{
try {
p->promise().set_value(f(exec_context { p->get_controller_ptr() }));
}
catch(...) {
p->promise().set_exception(std::current_exception());
}
}
template<class F>
void operator()(F& f, std::shared_ptr<exec_state<void>> const& p) const
{
try {
f(exec_context { p->get_controller_ptr() });
p->promise().set_value();
}
catch(...) {
p->promise().set_exception(std::current_exception());
}
}
};
template<class Ret>
struct exec_result
{
using return_type = Ret;
exec_result(std::shared_ptr<exec_state<return_type>> p)
: _impl(p)
{}
bool cancel() {
return _impl->notify_cancel();
}
std::future<Ret>& get_future()
{
return _future;
}
private:
std::shared_ptr<exec_state<return_type>> _impl;
std::future<return_type> _future { _impl->promise().get_future() };
};
template<class Executor, class F>
auto submit(Executor& exec, F&& f)
{
using function_type = std::decay_t<F>;
using result_type = std::result_of_t<function_type(exec_context)>;
using state_type = exec_state<result_type>;
auto shared_state = std::make_shared<state_type>();
exec.post([shared_state, f = std::forward<F>(f)]
{
applyer()(f, shared_state);
});
return exec_result<result_type>(std::move(shared_state));
}
int main()
{
using namespace std::literals;
boost::asio::io_service ios;
boost::asio::io_service::strand strand(ios);
boost::asio::io_service::work work(ios);
std::thread runner([&] { ios.run(); });
std::thread runner2([&] { ios.run(); });
auto func = [](auto context)
{
for(int i = 0 ; i < 1000 ; ++i)
{
if (context.canceled())
throw std::runtime_error("canceled");
std::this_thread::sleep_for(100ms);
}
};
auto handle = submit(strand, func);
auto handle2 = submit(ios, [](auto context) { return 2 + 2; });
// cancel the handle, or wait on it as you wish
std::this_thread::sleep_for(1s);
handle.cancel();
handle2.cancel(); // prove that late cancellation is a nop
try {
std::cout << "2 + 2 is " << handle2.get_future().get() << std::endl;
}
catch(std::exception& e)
{
std::cerr << "failed to add 2 + 2 : " << e.what() << std::endl;
}
try {
handle.get_future().get();
std::cout << "task completed" << std::endl;
}
catch(std::exception const& e) {
std::cout << "task threw exception: " << e.what() << std::endl;
}
ios.stop();
runner.join();
runner2.join();
}
update: v2为类添加了一些隐私保护,演示了2个并发任务。
预期输出:2 + 2 is 4
task threw exception: canceled
相关文章:
- C++Boost Asio Pool线程,带有lambda函数和传递引用变量
- 提升 ASIO - io_service 不要等待连接到线程
- ASIO signal_set多个 IO 线程不可靠,具体取决于代码顺序?
- 将更高的优先级设置为 boost::asio 线程处理进程
- asio 链对象线程安全吗?
- asio::io_service 具有多个线程的优先级队列处理
- boost信号和插槽在不同的线程中不工作(使用boost::asio::io_service)
- 调用boost.asio的异步函数时,线程是什么时候创建的
- 调用socket.remote_endpoint(提升 asio 库)线程安全性
- 使用线程池提升 ASIO 多线程 TCP 服务器
- 重写多线程事件驱动的C 程序以使用单线程Boost :: Asio
- 在具有许多内核的计算机上,使用 Boost ASIO 只能使用 1 个线程
- 如何将每线程用户数据传递到ASIO处理程序中
- 提高 ASIO stream_descriptor和事件 FD 线程安全性
- 提升 asio io_service多线程性能不佳
- 在std ::线程中升级ASIO阻止操作,而不是使用异步方法
- 是boost :: asio :: thread_pool线程在多个线程上发布任务时的安全性
- Boost.Asio、tcp::iostream 和多线程
- Boost.Asio:由于线程退出或应用程序请求,I/O 操作已中止
- boost::asio绞线缠绕的线程无法立即工作