使用boost::asio::async_read实现超时,而不调用io_service上的run

Implements timeout using boost::asio::async_read without call run on io_service

本文关键字:io 调用 service run 上的 超时 asio boost async read 使用      更新时间:2023-10-16

我试图从输入源(在本例中为stdin)读取超时。由于现有应用程序的设计,这必须适合,所以不可能在我的io_service上调用run。

这是我到目前为止的尝试:

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/optional.hpp>
void set_result( boost::optional<boost::system::error_code> * a, boost::system::error_code b ) {
    if( b == 0)
        a->reset( b );
}
void receive(boost::asio::io_service & io, boost::asio::posix::stream_descriptor & stream, boost::asio::streambuf & result){
    boost::optional<boost::system::error_code> timer_result;
    boost::optional<boost::system::error_code> read_result;
    boost::asio::deadline_timer timer( io );
    timer.expires_from_now( boost::posix_time::milliseconds(5000) );
    timer.async_wait( boost::bind(&set_result, &timer_result, _1) );
    boost::asio::async_read(
            stream,
            result,
            boost::asio::transfer_at_least(1),
            boost::bind( &set_result, &read_result, _1 ));
    boost::system::error_code ec;
    while(1) {
        io.reset();
        io.poll_one(ec);
        if ( read_result ) {
            timer.cancel();
            return;
        } else if ( timer_result )
            throw std::runtime_error("timeout");
    }
}
void receive(boost::asio::io_service & io, boost::asio::posix::stream_descriptor & stream, size_t size, boost::asio::streambuf & result){
    while( result.size() < size )
        receive(io, stream, result);
}
int main(int argc, const char *argv[])
{
    boost::asio::io_service io;
    boost::asio::posix::stream_descriptor in(io, ::dup(STDIN_FILENO));
    for(int i = 0; i < 5; i++){
        std::cout << i << " Type in 4 chareters and press enter" << std::endl << std::endl;
        std::cout << "in> ";
        std::cout.flush();
        try {
            boost::asio::streambuf buf;
            receive(io, in, 5, buf);
            std::cout << "out> ";
            std::copy(boost::asio::buffer_cast<const char *>(buf.data()), boost::asio::buffer_cast<const char *>(buf.data()) + buf.size(), std::ostream_iterator<char>(std::cout));
        } catch (const std::exception & e) {
            std::cout << e.what() << std::endl;
        }
    }
    return 0;
}

在我的第一眼,我认为它是工作的,但玩了更多的测试应用程序后,我得到了一些分割错误。

我发现如果我等待第一个查询超时,然后在下一个查询中输入5个字符,会发生一些不好的事情。

valgrind的语句如下:

==17216== Memcheck, a memory error detector
==17216== Copyright (C) 2002-2010, and GNU GPL'd, by Julian Seward et al.
==17216== Using Valgrind-3.6.1 and LibVEX; rerun with -h for copyright info
==17216== Command: ./ccTalkScan
==17216==
0 Type in 4 chareters and press enter
asdf
0 system:0
1 Type in 4 chareters and press enter
0 system:125
0 system:0
==17216== Invalid read of size 8
==17216==    at 0x546EB4A: ??? (setcontext.S:60)
==17216==    by 0x425245: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:321)
==17216==    by 0x42535E: main (ccTalkScan.cxx:333)
==17216==  Address 0x7feffdfa8 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB52: ??? (setcontext.S:63)
==17216==    by 0x425245: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:321)
==17216==    by 0x42535E: main (ccTalkScan.cxx:333)
==17216==  Address 0x7feffdf98 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB59: ??? (setcontext.S:64)
==17216==    by 0x425245: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:321)
==17216==    by 0x42535E: main (ccTalkScan.cxx:333)
==17216==  Address 0x7feffdf68 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB4A: ??? (setcontext.S:60)
==17216==    by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so)
==17216==  Address 0x7feffe018 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB52: ??? (setcontext.S:63)
==17216==    by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so)
==17216==  Address 0x7feffe008 is not stack'd, malloc'd or (recently) free'd
==17216==
==17216== Invalid read of size 8
==17216==    at 0x546EB59: ??? (setcontext.S:64)
==17216==    by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so)
==17216==  Address 0x7feffdfd8 is not stack'd, malloc'd or (recently) free'd
==17216==
timeout
2 Type in 4 chareters and press enter
0 system:0
timeout
3 Type in 4 chareters and press enter
asdf
==17216== Syscall param readv(vector[...]) points to unaddressable byte(s)
==17216==    at 0x5EF7A81: readv (in /lib64/libc-2.12.2.so)
==17216==    by 0x42A77A: boost::asio::detail::descriptor_ops::non_blocking_read(int, iovec*, unsigned long, boost::system::error_code&, unsigned long&) (descriptor_ops.ipp:153)
==17216==    by 0x4355D1: boost::asio::detail::descriptor_read_op_base<boost::asio::mutable_buffers_1>::do_perform(boost::asio::detail::reactor_op*) (descriptor_read_op.hpp:55)
==17216==    by 0x4275DA: boost::asio::detail::reactor_op::perform() (reactor_op.hpp:40)
==17216==    by 0x4288F9: boost::asio::detail::epoll_reactor::run(bool, boost::asio::detail::op_queue<boost::asio::detail::task_io_service_operation>&) (epoll_reactor.ipp:286)
==17216==    by 0x429577: boost::asio::detail::task_io_service::do_one(boost::asio::detail::scoped_lock<boost::asio::detail::posix_mutex>&, boost::asio::detail::task_io_service::idle_thread_info*) (task_io_service.ipp:264)
==17216==    by 0x429165: boost::asio::detail::task_io_service::poll_one(boost::system::error_code&) (task_io_service.ipp:188)
==17216==    by 0x4299B8: boost::asio::io_service::poll_one(boost::system::error_code&) (io_service.ipp:103)
==17216==    by 0x424FDF: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:308)
==17216==    by 0x425245: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:321)
==17216==    by 0x42535E: main (ccTalkScan.cxx:333)
==17216==  Address 0x65ba920 is 0 bytes inside a block of size 512 free'd
==17216==    at 0x4C25C4F: operator delete(void*) (vg_replace_malloc.c:387)
==17216==    by 0x432317: __gnu_cxx::new_allocator<char>::deallocate(char*, unsigned long) (new_allocator.h:95)
==17216==    by 0x430D67: std::_Vector_base<char, std::allocator<char> >::_M_deallocate(char*, unsigned long) (stl_vector.h:146)
==17216==    by 0x42F4A0: std::_Vector_base<char, std::allocator<char> >::~_Vector_base() (stl_vector.h:132)
==17216==    by 0x42CF5E: std::vector<char, std::allocator<char> >::~vector() (stl_vector.h:313)
==17216==    by 0x42AEB5: boost::asio::basic_streambuf<std::allocator<char> >::~basic_streambuf() (basic_streambuf.hpp:114)
==17216==    by 0x425374: main (ccTalkScan.cxx:333)
==17216==
0 system:0
4 Type in 4 chareters and press enter
0 system:125
0 system:0
timeout
==17216==
==17216== HEAP SUMMARY:
==17216==     in use at exit: 168 bytes in 3 blocks
==17216==   total heap usage: 63 allocs, 60 frees, 22,070 bytes allocated

我已经尝试了不同的方法来修复它,但我想我可能在这里误解了一两件事。所以一些帮助将是很好的,一个例子将是最感激的;)

上述实现中的问题是,超时的async_reads永远不会被取消。方法如下:

    while(1) {
        io.reset();
        io.poll_one(ec);
        if ( read_result ) {
            timer.cancel(); // cancel the timeout operation as it has not completed yet
            return;
        } else if ( timer_result ) {
            stream.cancel(); // cancel the read operation as it has not completed yet
            throw std::runtime_error("timeout");
        }
    }