如何在更多线程上运行提升 asio 解析器服务

How to run boost asio resolver service on more threads?

本文关键字:asio 服务 运行 多线程      更新时间:2023-10-16

我在SNMPV2实现中使用boost::asio::ip::udp::resolver来确定主机是否可访问。

using Resolver = boost::asio::ip::udp::resolver;
Resolver resolver(ioService);
Resolver::query query(connectOptions.getHost(),
        connectOptions.getPort());
Resolver::iterator endpointIterator;
BOOST_LOG_SEV(logger, Severity::debug) << "Waiting for async resolve";
endpointIterator = resolver.async_resolve(query, yield);
BOOST_LOG_SEV(logger, Severity::debug) << "Async resolve done";
if (endpointIterator == Resolver::iterator{}) { // unreachable host
    using namespace boost::system;
    throw system_error{error_code{SnmpWrapperError::BadHostname}};
}

有一个测试用例,我测试当查询不存在的主机名和退出主机名时会发生什么:

2013-09-16 10:45:28.687001: [DEBUG   ] 0x88baf8 SnmpConnection: connect                                                                                              
2013-09-16 10:45:28.687396: [DEBUG   ] 0x88baf8 SnmpConnection: host: non_existent_host_name_                                                                        
2013-09-16 10:45:28.687434: [DEBUG   ] 0x88baf8 SnmpConnection: port: 1611                                                                                           
2013-09-16 10:45:28.687456: [DEBUG   ] 0x88baf8 SnmpConnection: Waiting for async resolve                                                                            
2013-09-16 10:45:28.687675: [DEBUG   ] 0x88c608 SnmpConnection: connect                                                                                              
2013-09-16 10:45:28.687853: [DEBUG   ] 0x88c608 SnmpConnection: host: 127.0.0.1                                                                                      
2013-09-16 10:45:28.687883: [DEBUG   ] 0x88c608 SnmpConnection: port: 1611                                                                                           
2013-09-16 10:45:28.687904: [DEBUG   ] 0x88c608 SnmpConnection: Waiting for async resolve                                                                            
2013-09-16 10:45:31.113527: [ERROR   ] 0x88baf8 SnmpConnection: Host not found (authoritative)                                                                       
2013-09-16 10:45:31.113708: [DEBUG   ] 0x88c608 SnmpConnection: Async resolve done                                                                                   
2013-09-16 10:45:31.113738: [DEBUG   ] 0x88c608 SnmpConnection: Connecting to 127.0.0.1:1611
...

从日志中可以看出,具有可访问地址的对象被阻止,直到另一个对象的解析完成并出现错误(3 秒)。我的假设是 Asio 解析器服务使用一个线程,因此对一个无法访问的主机进行一次查询可能会阻止处理即将到来的解析请求。

解决方法是在更多线程上运行解析程序服务,这可能吗?或者是否有可能有一个解析器服务,像udp服务那样在套接字上工作(而不是使用::getaddrinfo)?

如文档中所述,Boost.Asio 将为每个io_service创建一个额外的线程,以模拟第一次调用 resolver::async_resolve() 时的异步主机解析。

仅当在与不同io_service关联的resolver上启动异步解析操作时,创建多个io_service对象才允许并发主机解析。 例如,以下代码不会执行并发主机解析,因为两个解析程序使用相同的服务:

boost::asio::io_service service1;
boost::asio::ip::udp::resolver resolver1(service1); // using service1
boost::asio::ip::udp::resolver resolver2(service1); // using service1
resolver1.async_resolve(...); 
resolver2.async_resolve(...);

另一方面,以下内容将执行并发主机解析,因为每个解析程序使用不同的服务:

boost::asio::io_service service1;
boost::asio::io_service service2;
boost::asio::ip::udp::resolver resolver1(service1); // using service1
boost::asio::ip::udp::resolver resolver2(service2); // using service2
resolver1.async_resolve(...); 
resolver2.async_resolve(...);

假设每io_service resolver,为了获得并发性,应用程序负责将解析操作分派给不同的解析器。 一个简单的工作分配策略(如循环)可能就足够了。

另一方面,可以将此责任委托给io_service,允许它以类似于Boost.Asio内部执行的方式分发模拟异步主机分辨率的工作。 同步 resolver::resolve() 成员函数在调用线程中执行工作。 因此,应用程序可以创建由线程池提供服务的io_service。 当需要异步主机解析时,会将作业发布到io_service中,该作业将创建创建resolver并执行同步解析,并使用结果调用用户处理程序。 下面是一个完整的基本示例,其中 resolver 类使用线程池模拟异步主机解析:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
/// @brief Type used to emulate asynchronous host resolution with a 
///        dedicated thread pool.
class resolver
{
public:
  resolver(const std::size_t pool_size)
    : work_(boost::ref(io_service_))
  {
    // Create pool.
    for (std::size_t i = 0; i < pool_size; ++i)
      threads_.create_thread(
        boost::bind(&boost::asio::io_service::run, &io_service_));
  }
  ~resolver()
  {
    work_ = boost::none;
    threads_.join_all();
  }
  template <typename QueryOrEndpoint, typename Handler>
  void async_resolve(QueryOrEndpoint query, Handler handler)
  {
    io_service_.post(boost::bind(
        &resolver::do_async_resolve<QueryOrEndpoint, Handler>, this,
        query, handler));
  }
private:
  /// @brief Resolve address and invoke continuation handler.
  template <typename QueryOrEndpoint, typename Handler>
  void do_async_resolve(const QueryOrEndpoint& query, Handler handler)
  {
    typedef typename QueryOrEndpoint::protocol_type protocol_type;
    typedef typename protocol_type::resolver        resolver_type;
    // Resolve synchronously, as synchronous resolution will perform work
    // in the calling thread.  Thus, it will not use Boost.Asio's internal
    // thread that is used for asynchronous resolution.
    boost::system::error_code error;
    resolver_type resolver(io_service_);
    typename resolver_type::iterator result = resolver.resolve(query, error);
    // Invoke user handler.
    handler(error, result);
  }
private:
  boost::asio::io_service io_service_;
  boost::optional<boost::asio::io_service::work> work_;
  boost::thread_group threads_;
};
template <typename ProtocolType>
void handle_resolve(
    const boost::system::error_code& error,
    typename ProtocolType::resolver::iterator iterator)
{
  std::stringstream stream;
  stream << "handle_resolve:n"
            "  " << error.message() << "n";
  if (!error)
    stream << "  " << iterator->endpoint() << "n";
  std::cout << stream.str();
  std::cout.flush();
}
int main()
{
  // Resolver will emulate asynchronous host resolution with a pool of 5
  // threads.
  resolver resolver(5);
  namespace ip = boost::asio::ip;
  resolver.async_resolve( 
      ip::udp::resolver::query("localhost", "12345"),
      &handle_resolve<ip::udp>);
  resolver.async_resolve(
      ip::tcp::resolver::query("www.google.com", "80"),
      &handle_resolve<ip::tcp>);
  resolver.async_resolve(
      ip::udp::resolver::query("www.stackoverflow.com", "80"),
      &handle_resolve<ip::udp>);
  resolver.async_resolve(
      ip::icmp::resolver::query("some.other.address", "54321"),
      &handle_resolve<ip::icmp>);
}

和带注释的输出:

handle_resolve:
  Success
  127.0.0.1:12345   // localhost
handle_resolve:
  Service not found // bogus
handle_resolve:
  Success
  173.194.77.147:80 // google
handle_resolve:
  Success
  198.252.206.16:80 // stackoverflow
你需要

的是两个io_service ioService,因为每个都由一个线程运行。我的意思是你通过调用io_service::run来阻止线程的正常执行。

我认为代码本身是正确的。