asio .async_* 不会运行,除非主线程调用 io_service.run

asio .async_* won't run unless main thread calls io_service.run

本文关键字:线程 调用 service run io async asio 运行      更新时间:2023-10-16

我最近一直在使用Boost Asio和一些新的C 11构造。这是导致意外行为的代码示例部分(至少对我来说)。

void Server::startAccept()
{
    connections_.push_back(std::make_shared<Connection>(io_service_));
    acceptor_.async_accept(connections_.back()->socket(), std::bind(&Server::accept_handler, this, connections_.back(), std::placeholders::_1));
}
void Server::accept_handler(std::shared_ptr<Connection> con, const boost::system::error_code& ec)
{
    startAccept();
    if (!ec) {
        con->start();
    }
}

在拨打服务器:: startAccept之前,我创建了一个io_service :: Work的实例和一个称为io_service_.run()的STD :: thread池。在我拨打startAccept后,主线程只需等待命令行输入。

我希望我的线程池中的一个线程在连接启动时运行服务器:: accept_handler。这不会发生。相反,我必须从主线程调用io_service_.run()。

现在我玩了一段时间,发现我可以通过这样做来实现所需的行为:

void Server::startAccept()
{
    connections_.push_back(std::make_shared<Connection>(io_service_));
    io_service_.post([this]() { acceptor_.async_accept(connections_.back()->socket(), std::bind(&Server::accept_handler, this, connections_.back(), std::placeholders::_1)); });
}
void Server::accept_handler(std::shared_ptr<Connection> con, const boost::system::error_code& ec)
{
    startAccept();
    if (!ec) {
        con->start();
    }
}

.ASYNC_*操作与io_service.post?

之间有什么区别

编辑:定义boost_asio_enable_handler_tracking

当我编译和运行程序时,然后使用第一个代码块连接到服务器,这是输出:

@asio|1350656555.431359|0*1|socket@00A2F710.async_accept

当我运行第二个代码时,我包括并连接到服务器,我将获得此输出:

@asio|1350656734.789896|0*1|io_service@00ECEC78.post
@asio|1350656734.789896|>1|
@asio|1350656734.789896|1*2|socket@00D0FDE0.async_accept
@asio|1350656734.789896|<1|
@asio|1350656756.150051|>2|ec=system:0
@asio|1350656756.150051|2*3|io_service@00ECEC78.post
@asio|1350656756.150051|>3|
@asio|1350656756.150051|2*4|socket@00EE9090.async_send
@asio|1350656756.150051|3*5|socket@00D0FDE0.async_accept
@asio|1350656756.150051|2*6|socket@00EE9090.async_receive
@asio|1350656756.150051|<3|
@asio|1350656756.150051|>4|ec=system:0,bytes_transferred=54
@asio|1350656756.150051|<2|
@asio|1350656756.150051|<4|
@asio|1350656758.790803|>6|ec=system:10054,bytes_transferred=0
@asio|1350656758.790803|<6|

编辑2:线程创建Insight

for (int i = 0; i < NUM_WORKERS; i++) {
    thread_pool.push_back(std::shared_ptr<std::thread>(new std::thread([this]() { io_service_.run(); })));
}

如果您不忘记致电io_service ::为池中的每个线程运行,并且您使用io_service ::工作以避免从io_service :: run loop退出,则在第一个中您的代码案例是绝对正确的。

这是有效的示例(我忽略了程序的连接处理并正确关闭程序)

class Connection
{
public:
    Connection(boost::asio::io_service & io_serivce) : socket_(io_serivce) {}
    boost::asio::ip::tcp::socket & socket() { return socket_; }
private:
    boost::asio::ip::tcp::socket socket_;
};
class Server
{
public:
    Server(boost::asio::io_service & io_serivce, const std::string & addr, const std::string & port) : io_service_(io_serivce), acceptor_(io_service_) {
        boost::asio::ip::tcp::resolver resolver(io_service_);
        boost::asio::ip::tcp::resolver::query query(addr, port);
        boost::asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
        acceptor_.open(endpoint.protocol());
        acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
        acceptor_.bind(endpoint);
        acceptor_.listen();
        startAccept();
    }
    void startAccept(){
        connections_.push_back(boost::make_shared<Connection>(  boost::ref(io_service_) ));    
        acceptor_.async_accept(connections_.back()->socket(), boost::bind(&Server::accept_handler, this, connections_.back(), _1) );
    }
    void Server::accept_handler(boost::shared_ptr<Connection> con, const boost::system::error_code& ec)
    {
        std::cout << "start connection" << std::endl;
        startAccept();
    }
private:
    boost::asio::io_service & io_service_;
    boost::asio::ip::tcp::acceptor acceptor_; 
    std::vector< boost::shared_ptr<Connection> > connections_;
};
int main(int argc, char * argv[])
{
    // thread pool
    boost::thread_group threads_;   
    boost::asio::io_service io_service_;
    boost::asio::io_service::work work_(io_service_);
    const size_t kThreadsCount = 3;
    for (std::size_t i = 0; i < kThreadsCount; ++i) {
        threads_.create_thread(boost::bind(&boost::asio::io_service::run, &io_service_));
    }  
    Server s(io_service_, "127.0.0.1", "8089");
    char ch;
    std::cin >> ch;
    return 0;
}

io_service.run函数是实际事件循环,基本上是在循环中调用 io_service.post

编辑:来自io_service.post文档:

请求io_service调用给定处理程序并立即返回。

io_service保证只有在当前调用run()run_one()poll() or poll_one()成员功能的线程中调用处理程序。

您的应该做的是实现自己的事件循环,调用io_service.run_one,或致电io_service.run以让Boost处理事件循环。您从哪个线程中运行事件循环,所有事件处理程序将从您运行事件循环的线程中调用。