Async_read_some模拟同步超时接收

async_read_some to emulate syncronous timeout receive

本文关键字:超时 同步 模拟 read some Async      更新时间:2023-10-16

我的程序总是使用平台相关的同步接收,它会阻塞执行,直到超时或接收事件,如:

recv(buf, size, timeout);

现在我想用boost替换这段代码,使其跨平台。我找到了解决方案,但我认为它相当丑陋(与单个函数调用相比)。我这样写:

void IPV4_HOST::recv_timer_handler(const boost::system::error_code & e)
{
    if (e.value() == boost::system::errc::success) {
        f_recv_timeout = true;
        asio_socket.cancel();
    }
}
void IPV4_HOST::recv_handler(const boost::system::error_code & , size_t bytes)
{
    recv_timer.cancel();
    bytes_received = bytes;
}
int IPV4_HOST::receive(void * buf, size_t buf_size, TIME::INTERVAL timeout)
{
    f_recv_timeout = false;
    bytes_received = 0;
    recv_timer.expires_from_now(timeout.get_boost_milli());
    recv_timer.async_wait(boost::bind(&IPV4_HOST::recv_timer_handler, this, boost::asio::placeholders::error));
    asio_socket.async_read_some(boost::asio::buffer(buf, buf_size),
                                boost::bind(&IPV4_HOST::recv_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
    io_service.run();
    if (f_recv_timeout)
        throw IO::TimeoutReceiveException();
    return bytes_received;
}

你能告诉我,我做得对吗?有更简单的方法吗?

这是正确的方向,但有一些微妙的问题需要考虑:

  • 如果任何其他工作被发布到io_service,那么IPV4_HOST::receive()将开始处理该工作。
  • io_service::run()在正常情况下返回时,表示io_service已经停止。后续对run()run_one()poll()poll_one()的调用将立即返回,除非io_service是reset()
  • 两个异步操作可以同时完成,使两个完成处理程序准备好成功运行。这种行为在deadline_timer::cancel()的注释部分得到了强调;然而,所有异步操作都表现出这种行为。在现有代码中,这可能导致当bytes_received大于零时抛出IO::TimeoutReceiveException

处理io_service细节的一种解决方案,以及使用完成处理程序执行的不确定顺序,可能看起来像:

void IPV4_HOST::recv_timer_handler(const boost::system::error_code & e)
{
  timer_handled = true;
  if (!e) {
    f_recv_timeout = true;
    asio_socket.cancel();
  }
}
void IPV4_HOST::recv_handler(const boost::system::error_code &,
                             size_t bytes)
{
  recv_handled = true;
  recv_timer.cancel();
  bytes_received = bytes;
}
int IPV4_HOST::receive(void * buf, size_t buf_size, TIME::INTERVAL timeout)
{
  timer_handled  = false;
  recv_handled   = false;
  f_recv_timeout = false;
  bytes_received = 0;
  recv_timer.expires_from_now(timeout.get_boost_milli());
  recv_timer.async_wait(
    boost::bind(&IPV4_HOST::recv_timer_handler, this,
                boost::asio::placeholders::error));
  asio_socket.async_read_some(
    boost::asio::buffer(buf, buf_size),
    boost::bind(&IPV4_HOST::recv_handler, this, 
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
  // If a handler has not ran, then keep processing work on the io_service.
  // We need to consume both handlers so that old handlers are not in the
  // io_service the next time receive is called.
  while (!timer_handled || !recv_handled)
  {
    io_service.run_one();
  }
  // If the io_service has stopped (due to running out of work), then reset
  // it so that it can be run on next call to receive.
  if (io_service.stopped())
    io_service.reset();
  // If no bytes were received and the timeout occurred, then throw.  This
  // handles the case where both a timeout and receive occurred at the 
  // same time.
  if (!bytes_received && f_recv_timeout)
    throw IO::TimeoutReceiveException();
  return bytes_received;
}

同样,当你试图获得跨平台行为时,请阅读basic_stream_socket::cancel()的注释。这里有一些平台特定的行为需要注意。