由于超时而取消async_read

Cancel async_read due to timeout

本文关键字:取消 async read 于超时 超时      更新时间:2023-10-16

我正在尝试围绕async_read编写一个包装器同步方法,以允许在套接字上进行非阻塞读取。在互联网上的几个例子之后,我开发了一个解决方案,它似乎几乎是正确的,但不起作用。

该类声明以下相关属性和方法:

class communications_client
{
    protected:
        boost::shared_ptr<boost::asio::io_service> _io_service;
        boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
        boost::array<boost::uint8_t, 128> _data;
        boost::mutex _mutex;
        bool _timeout_triggered;
        bool _message_received;
        boost::system::error_code _error;
        size_t _bytes_transferred;
        void handle_read(const boost::system::error_code & error, size_t bytes_transferred);
        void handle_timeout(const boost::system::error_code & error);
        size_t async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error);
        ...
}

方法async_read_helper是封装所有复杂性的方法,而其他两个handle_readhandle_timeout只是事件处理程序。以下是三种方法的实现:

void communications_client::handle_timeout(const boost::system::error_code & error)
{
    if (!error)
    {
        _mutex.lock();
        _timeout_triggered = true;
        _error.assign(boost::system::errc::timed_out, boost::system::system_category());
        _mutex.unlock();
    }
}
void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    _mutex.lock();
    _message_received = true;
    _error = error;
    _bytes_transferred = bytes_transferred;
    _mutex.unlock();
}
size_t communications_client::async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error)
{
    _timeout_triggered = false;
    _message_received = false;
    boost::asio::deadline_timer timer(*_io_service);
    timer.expires_from_now(timeout);
    timer.async_wait(
        boost::bind(
            &communications_client::handle_timeout,
            this,
            boost::asio::placeholders::error));
    boost::asio::async_read(
        *_socket,
        boost::asio::buffer(_data, 128),
        boost::asio::transfer_exactly(bytes_to_transfer),
        boost::bind(
            &communications_client::handle_read,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));
    while (true)
    {
        _io_service->poll_one();
        if (_message_received)
        {
            timer.cancel();
            break;
        }
        else if (_timeout_triggered)
        {
            _socket->cancel();
            break;
        }
    }
    return _bytes_transferred;
}

我的主要问题是:为什么这适用于_io_service->poll_one()上的循环,而没有循环并调用_io_service->run_one()?另外,我想知道对于更习惯使用Boost和Asio的人来说,它看起来是否正确。谢谢!


修复提案 #1

根据Jonathan Wakely的评论,可以在操作完成后使用_io_service->run_one()调用_io_service->reset()来替换循环。它应如下所示:

_io_service->run_one();
if (_message_received)
{
    timer.cancel();
}
else if (_timeout_triggered)
{
    _socket->cancel();
}
_io_service->reset();

经过一些测试,我已经检查了这种解决方案是否单独不起作用。handle_timeout方法被连续调用,错误代码为 operation_aborted 。如何停止这些呼叫?

修复提案#2

twsansbury的答案是准确的,并且基于坚实的文档基础。该实现导致async_read_helper中的以下代码:

while (_io_service->run_one())
{
    if (_message_received)
    {
        timer.cancel();
    }
    else if (_timeout_triggered)
    {
        _socket->cancel();
    }
}
_io_service->reset();

以及对handle_read方法的以下更改:

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    if (error != boost::asio::error::operation_aborted)
    {
        ...
    }
}

该解决方案在测试过程中已被证明是可靠和正确的。

io_service::run_one()io_service::poll_one() 之间的主要区别在于,run_one() 将阻塞,直到处理程序准备好运行,而poll_one()不会等待任何未完成的处理程序准备就绪。

假设_io_service上唯一的未完成处理程序是handle_timeout()handle_read(),那么run_one()不需要循环,因为它只会在handle_timeout()handle_read()运行后返回。 另一方面,poll_one()需要一个循环,因为poll_one()会立即返回,因为handle_timeout()handle_read()都没有准备好运行,从而导致函数最终返回。

原始代码以及修复建议 #1 的主要问题是,当async_read_helper()返回时,io_service中仍有未完成的处理程序。 在下一次调用async_read_helper()时,下一个要调用的处理程序将是上一次调用的处理程序。 io_service::reset() 方法仅允许io_service从停止状态恢复运行,它不会删除已排队进入io_service的任何处理程序。 若要说明此行为,请尝试使用循环来使用io_service中的所有处理程序。 使用完所有处理程序后,退出循环并重置io_service:

// Consume all handlers.
while (_io_service->run_one())
{
  if (_message_received)
  {
    // Message received, so cancel the timer.  This will force the completion of
    // handle_timer, with boost::asio::error::operation_aborted as the error.
    timer.cancel();
  }
  else if (_timeout_triggered)
  {
    // Timeout occured, so cancel the socket.  This will force the completion of
    // handle_read, with boost::asio::error::operation_aborted as the error.
    _socket->cancel();
  }
}
// Reset service, guaranteeing it is in a good state for subsequent runs.
_io_service->reset();

从调用方的角度来看,这种形式的超时是同步的,就像run_one()块一样。 但是,I/O 服务内的工作仍在进行中。 另一种方法是使用 Boost.Asio 对C++期货的支持来等待未来并执行超时。 此代码可能更易于阅读,但它需要至少一个其他线程来处理 I/O 服务,因为等待超时的线程不再处理 I/O 服务:

// Use an asynchronous operation so that it can be cancelled on timeout.
std::future<std::size_t> read_result = boost::asio::async_read(
    socket, buffer, boost::asio::use_future);
// If timeout occurs, then cancel the operation.
if (read_result.wait_for(std::chrono::seconds(1)) == 
    std::future_status::timeout)
{
  socket.cancel();
}
// Otherwise, the operation completed (with success or error).
else
{
  // If the operation failed, then on_read.get() will throw a
  // boost::system::system_error.
  auto bytes_transferred = read_result.get();
  // process buffer
}