在 boost asio 异步服务器中执行计算 taks

Execute computational taks in boost asio asynchronous server

本文关键字:执行 计算 taks 服务器 boost asio 异步      更新时间:2023-10-16

我有一个简单的异步服务器,很大程度上受到boost asio文档(单线程服务器(中的HTTP服务器示例的启发,该示例处理客户端发送的请求。

我的 server 类会在每次新客户端连接并调用其 start() 方法时创建一个新的 connection 对象(如 HTTP 服务器示例中所示(。connection实例读取客户端的请求,然后使用异步操作(即boost::asio::async_readboost::asio::async_write(发送回复

以下是connection类的简化版本:

void connection::start() {
    // Read request from a client
    boost::asio::mutable_buffers_1 read_buffer = boost::asio::buffer(
            buffer_data_, REQUET_SIZE);
    boost::asio::async_read(socket_, read_buffer,
            boost::bind(&connection::handle_read_request, shared_from_this(),
                    read_buffer, boost::asio::placeholders::error,
                    boost::asio::placeholders::bytes_transferred));
}
// Error handling omitted for the sake of brievty
void connection::handle_read_request(boost::asio::mutable_buffers_1& buffer,
    const boost::system::error_code& e, std::size_t bytes_transferred) {
      request req = parse_request(buffer);
      if(req.type_ = REQUEST_TYPE_1) {
          reply rep(...........);
          rep.prepare_buffer(buffer_data_.c_array());
          // Send the request using async_write
          boost::asio::async_write(socket_,
               boost::asio::buffer(buffer_data_, rep.required_buffer_size()),
               boost::bind(&connection::stop, shared_from_this()));
      } else if(req.type_ = REQUEST_TYPE_2 {
          // Need to do heavy computational task
      }
}

所有这些都运行良好,但是,在某些情况下,我需要执行繁重的计算任务(REQUEST_TYPE_2(。我无法在handle_read_request中执行这些任务,因为它们会阻止单线程服务器并阻止其他客户端开始提供服务。

理想情况下,我想将我繁重的计算任务提交到线程池,并在任务完成后运行我的连接类的方法(例如connection::handle_done_task(std::string computation_result)(。此handle_done_task(std::string computation_result)会将计算结果发送到客户端(使用 boost::asio::async_write (。

我怎么能这样?我是否应该注意一些问题(从多个线程在同一套接字上调用boost::asio::async_write是否安全(?

正如文档明确指出的那样,asio 对象(strand/io_service 除外(不是线程安全的,因此您不应该在没有同步的情况下从多个线程调用async_write。相反,请使用后io_service习语。喜欢这个:

// pseudocode, untested!
if (req.type_ = REQUEST_TYPE_2) 
{
  auto self = shared_from_this(); // lets capture shared_ptr<connection> to ensure its lifespan
  auto handler = [=](computation_result_type res)
  {
    // post the function that accesses asio i/o objects to `io_service` thread
    self->io_->post([] { handle_done_task(res); });
  }
  thread worker([=](const std::function<(computation_result_type res)> &handler) 
  {     
    // do the heavy work...
    // then invoke the handler
    handler(result);
  });
  worker.detach();
}