boost::asio+std::future-关闭套接字后发生访问冲突

boost::asio + std::future - Access violation after closing socket

本文关键字:访问冲突 套接字 asio+std future- boost      更新时间:2023-10-16

我正在编写一个简单的tcp客户端来发送和接收单行文本。异步操作由std::future处理,以便在超时的情况下简化阻塞查询。不幸的是,我的测试应用程序在破坏服务器对象时因访问冲突而崩溃。这是我的代码:

TCPClient.hpp

#ifndef __TCPCLIENT_H__
#define __TCPCLIENT_H__
#include <boost/asio.hpp>
#include <boost/asio/use_future.hpp>
#include <memory>
#include <vector>
#include <future>
#include <thread>
#include <chrono>
#include <iostream>
#include <iterator>
using namespace boost::asio;
class TCPClient {
public:
    TCPClient();
    ~TCPClient();
    void connect(const std::string& address, const std::string& port);
    void disconnect();
    std::string sendMessage(const std::string& msg);
private:
    boost::asio::io_service ioservice;
    boost::asio::io_service::work work;
    std::thread t;
    std::unique_ptr<boost::asio::ip::tcp::socket> socket;
};
inline TCPClient::TCPClient() : ioservice(), work(ioservice) {
    t = std::thread([&]() {
        try {
            ioservice.run();
        }
        catch (const boost::system::system_error& e) {
            std::cerr << e.what() << std::endl;
        }
    });
}
inline TCPClient::~TCPClient() {
    disconnect();
    ioservice.stop();
    if (t.joinable()) t.join();
}
inline void TCPClient::connect(const std::string& address, const std::string& port) {
    socket.reset(new ip::tcp::socket(ioservice));
    ip::tcp::resolver::query query(address, port);
    std::future<ip::tcp::resolver::iterator> conn_result = async_connect(*socket, ip::tcp::resolver(ioservice).resolve(query), use_future);
    if (conn_result.wait_for(std::chrono::seconds(6)) != std::future_status::timeout) {
        conn_result.get(); // throws boost::system::system_error if the operation fails
    }
    else {
        //socket->close();
        // throw timeout_error("Timeout");
        throw std::exception("timeout");
    }
}
inline void TCPClient::disconnect() {
    if (socket) {
        try {
            socket->shutdown(ip::tcp::socket::shutdown_both);
            std::cout << "socket points to " << std::addressof(*socket) << std::endl;
            socket->close();
        }
        catch (const boost::system::system_error& e) {
            // ignore
            std::cerr << "ignored error " << e.what() << std::endl;
        }
    }
}
inline std::string TCPClient::sendMessage(const std::string& msg) {
    auto time_over = std::chrono::system_clock::now() + std::chrono::seconds(4);
    /*
    // Doesn't affect the error
    std::future<size_t> write_fut = boost::asio::async_write(*socket, boost::asio::buffer(msg), boost::asio::use_future);
    try {
        write_fut.get();
    }
    catch (const boost::system::system_error& e) {
        std::cerr << e.what() << std::endl;
    }
    */
    boost::asio::streambuf response;
    std::future<std::size_t> read_fut = boost::asio::async_read_until(*socket, response, 'n', boost::asio::use_future);
    if (read_fut.wait_until(time_over) != std::future_status::timeout) {
        std::cout << "read " << read_fut.get() << " bytes" << std::endl;
        return std::string(std::istreambuf_iterator<char>(&response), std::istreambuf_iterator<char>());
    }
    else {
        std::cout << "socket points to " << std::addressof(*socket) << std::endl;
        throw std::exception("timeout");
    }
}
#endif

main.cpp

#include <iostream>
#include "TCPClient.hpp"
int main(int argc, char* argv[]) {
    TCPClient client;
    try {
        client.connect("localhost", "27015");
        std::cout << "Response: " << client.sendMessage("Hello!") << std::endl;
    }
    catch (const boost::system::system_error& e) {
        std::cerr << e.what() << std::endl;
    }
    catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
    system("pause");
    return 0;
}

输出是预期的"超时"(测试服务器没有故意发送数据),但ioservice.run()在关闭TCPClient::disconnect()中的套接字后立即崩溃(访问冲突)。我是不是记错了?

编译器是MSVC 12.0.31011.00 Update 4(Visual Studio 2013)

recvmsg正在接收到一个缓冲区(streambuf),该缓冲区在TCPClient::sendMessage中抛出异常后释放(第105行,作用域结束)。

您忘记取消在第97行启动的异步操作(async_read_until)。修复它:

else {
    socket->cancel(); // ADDED
    std::cout << "socket points to " << std::addressof(*socket) << std::endl;
    throw std::runtime_error("timeout");
}

甚至,只有

    socket.reset(); // ADDED

其他超时路径也是如此。

另一个答案解决了问题所在。

不过,在更高的级别上,您使用期货,只是来立即等待它们的回报。

我突然意识到,这实际上根本不是异步的,你应该能够做到:

  • 无螺纹和连接
  • .stop()
  • 没有workwork.reset()
  • 没有显式构造函数或析构函数
  • 没有unique_ptr<socket>及其附带的生命周期管理
  • 不带future<>以及随附的.get()future_status检查

总而言之,你可以做得简单得多,例如使用一个简单的助手函数,比如

class TCPClient {
public:
    void        disconnect();
    void        connect(const std::string& address, const std::string& port);
    std::string sendMessage(const std::string& msg);
private:
    using error_code = boost::system::error_code;
    template<typename AllowTime> void await_operation(AllowTime const& deadline_or_duration) {
        using namespace boost::asio;
        ioservice.reset();
        {
            high_resolution_timer tm(ioservice, deadline_or_duration);
            tm.async_wait([this](error_code ec) { if (ec != error::operation_aborted) socket.cancel(); });
            ioservice.run_one();
        }
        ioservice.run();
    }
    boost::asio::io_service      ioservice { };
    boost::asio::ip::tcp::socket socket { ioservice };
};

例如connect(...)曾经是:

socket.reset(new ip::tcp::socket(ioservice));
ip::tcp::resolver::query query(address, port);
std::future<ip::tcp::resolver::iterator> conn_result = async_connect(*socket, ip::tcp::resolver(ioservice).resolve(query), use_future);
if (conn_result.wait_for(std::chrono::seconds(6)) != std::future_status::timeout) {
    conn_result.get(); // throws boost::system::system_error if the operation fails
}
else {
    socket->cancel();
    // throw timeout_error("Timeout");
    throw std::runtime_error("timeout");
}

现在变成:

async_connect(socket, 
        ip::tcp::resolver(ioservice).resolve({address, port}),
        [&](error_code ec, ip::tcp::resolver::iterator it) { if (ec) throw std::runtime_error(ec.message()); });
await_operation(std::chrono::seconds(6));

与wise一样,sendMessage变为:

streambuf response;
async_read_until(socket, response, 'n', [&](error_code ec, size_t bytes_read) {
        if (ec) throw std::runtime_error(ec.message());
        std::cout << "read " << bytes_read << " bytes" << std::endl;
    });
await_operation(std::chrono::system_clock::now() + std::chrono::seconds(4));
return {std::istreambuf_iterator<char>(&response), {}};

请注意,这些非常简单。还要注意的是,根据失败的原因,现在会抛出正确的异常消息。

完整演示

在Coliru上直播

#ifndef __TCPCLIENT_H__
#define __TCPCLIENT_H__
#include <boost/asio.hpp>
#include <boost/asio/high_resolution_timer.hpp>
#include <iostream>
class TCPClient {
public:
    void        disconnect();
    void        connect(const std::string& address, const std::string& port);
    std::string sendMessage(const std::string& msg);
private:
    using error_code = boost::system::error_code;
    template<typename AllowTime> void await_operation(AllowTime const& deadline_or_duration) {
        using namespace boost::asio;
        ioservice.reset();
        {
            high_resolution_timer tm(ioservice, deadline_or_duration);
            tm.async_wait([this](error_code ec) { if (ec != error::operation_aborted) socket.cancel(); });
            ioservice.run_one();
        }
        ioservice.run();
    }
    boost::asio::io_service      ioservice { };
    boost::asio::ip::tcp::socket socket { ioservice };
};

inline void TCPClient::connect(const std::string& address, const std::string& port) {
    using namespace boost::asio;
    async_connect(socket, 
            ip::tcp::resolver(ioservice).resolve({address, port}),
            [&](error_code ec, ip::tcp::resolver::iterator it) { if (ec) throw std::runtime_error(ec.message()); });
    await_operation(std::chrono::seconds(6));
}
inline void TCPClient::disconnect() {
    using namespace boost::asio;
    if (socket.is_open()) {
        try {
            socket.shutdown(ip::tcp::socket::shutdown_both);
            socket.close();
        }
        catch (const boost::system::system_error& e) {
            // ignore
            std::cerr << "ignored error " << e.what() << std::endl;
        }
    }
}
inline std::string TCPClient::sendMessage(const std::string& msg) {
    using namespace boost::asio;
    streambuf response;
    async_read_until(socket, response, 'n', [&](error_code ec, size_t bytes_read) {
            if (ec) throw std::runtime_error(ec.message());
            std::cout << "read " << bytes_read << " bytes" << std::endl;
        });
    await_operation(std::chrono::system_clock::now() + std::chrono::seconds(4));
    return {std::istreambuf_iterator<char>(&response), {}};
}
#endif
#include <iostream>
//#include "TCPClient.hpp"
int main(/*int argc, char* argv[]*/) {
    TCPClient client;
    try {
        client.connect("127.0.0.1", "27015");
        std::cout << "Response: " << client.sendMessage("Hello!") << std::endl;
    }
    catch (const boost::system::system_error& e) {
        std::cerr << e.what() << std::endl;
    }
    catch (const std::exception& e) {
        std::cerr << e.what() << std::endl;
    }
}

奖金

如果你想要更方便的话,有一个通用的回调处理程序,它只会引发异常:

struct raise {
    template <typename... A> void operator()(error_code ec, A...) const {
        if (ec) throw std::runtime_error(ec.message()); 
    }
};

现在,在没有lambdas的情况下,身体变得更加简单:

inline void TCPClient::connect(const std::string& address, const std::string& port) {
    async_connect(socket, ip::tcp::resolver(ioservice).resolve({address, port}), raise());
    await_operation(std::chrono::seconds(6));
}
inline std::string TCPClient::sendMessage(const std::string& msg) {
    streambuf response;
    async_read_until(socket, response, 'n', raise());
    await_operation(std::chrono::system_clock::now() + std::chrono::seconds(4));
    return {std::istreambuf_iterator<char>(&response), {}};
}

请参阅改编后的演示:LiveOnColiru