如何做 boost::asio::spawn 与 io_service-per CPU.

how to do boost::asio::spawn with io_service-per-CPU?

本文关键字:io service-per CPU spawn 何做 boost asio      更新时间:2023-10-16

我的服务器基于boost生成回显服务器。

服务器在单核机器上运行良好,几个月来甚至没有一次崩溃。即使它需要 100% CPU,它仍然可以正常工作。

但是我需要处理更多的客户端请求,现在我使用多核机器。要使用我在多个线程上运行的所有 CPU,io_service如下所示:

#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <boost/thread/thread.hpp>
#include <iostream>
#include <memory>
#include <thread>
using namespace std;
using boost::asio::ip::tcp;
class session : public std::enable_shared_from_this<session>{
public:
    explicit session(tcp::socket socket)
        : socket_(std::move(socket)),
        timer_(socket_.get_io_service()),
        strand_(socket_.get_io_service())
    {}
    void go()
    {
        auto self(shared_from_this());
        boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
        {
           try {
               char data[1024] = {'3'};
               for( ; ;) {
                   timer_.expires_from_now(std::chrono::seconds(10));
                   std::size_t n = socket_.async_read_some(boost::asio::buffer(data, sizeof(data)), yield);
                   // do something with data
                   // write back something
                   boost::asio::async_write(socket_, boost::asio::buffer(data, sizeof(data)), yield);
               }
           } catch(...)   {
               socket_.close();
               timer_.cancel();
           }
        });
        boost::asio::spawn(strand_, [this, self](boost::asio::yield_context yield)
        {
           while(socket_.is_open()) {
               boost::system::error_code ignored_ec;
               timer_.async_wait(yield[ignored_ec]);
               if(timer_.expires_from_now() <= std::chrono::seconds(0))
                   socket_.close();
           }
        });
    }
private:
    tcp::socket socket_;
    boost::asio::steady_timer timer_;
    boost::asio::io_service::strand strand_;
};
int main(int argc, char* argv[]) {
    try {
        boost::asio::io_service io_service;
        boost::asio::spawn(io_service, [&](boost::asio::yield_context yield)
        {
            tcp::acceptor acceptor(io_service,
#define PORT "7788"
            tcp::endpoint(tcp::v4(), std::atoi(PORT)));
            for( ; ;) {
                boost::system::error_code ec;
                tcp::socket socket(io_service);
                acceptor.async_accept(socket, yield[ec]);
                if(!ec)
                    // std::make_shared<session>(std::move(socket))->go();
                    io_service.post(boost::bind(&session::go, std::make_shared<session>(std::move(socket))));
            }
        });
        // ----------- this works fine on single-core machine ------------
        {
            // io_service.run();
        }
        // ----------- this crashes (with multi core) ----------
        {
            auto thread_count = std::thread::hardware_concurrency(); // for multi core
            boost::thread_group threads;
            for(auto i = 0; i < thread_count; ++i)
                threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
            threads.join_all();
        }
    } catch(std::exception& e) {
        std::cerr << "Exception: " << e.what() << "n";
    }
    return 0;
}

该代码在单核处理上工作正常,但在 2 核/4 核/8 核机器上一直崩溃。从崩溃转储中,我没有看到与我的代码相关的任何内容,只是关于 boost::spawn 和一些随机命名的 lambda 的东西。

所以我想试试这个:每个 CPU 运行 io_service

我找到了一些演示,但它使用异步功能:

void server::start_accept()
{
  new_connection_.reset(new connection(
        io_service_pool_.get_io_service(), request_handler_));
  acceptor_.async_accept(new_connection_->socket(),
      boost::bind(&server::handle_accept, this,
        boost::asio::placeholders::error));
}
void server::handle_accept(const boost::system::error_code& e)
{
  if (!e)
  {
    new_connection_->start();
  }
  start_accept();
}

io_service_pool_.get_io_service()随机拾取一个io_service,但我的代码使用spawn

boost::asio::spawn(io_service, ...

如何使用随机io_service spawn

似乎我问错了问题,spawn不能使用多个io_service,但socket可以。我将代码修改为:

int main(int argc, char* argv[]) {
    try {
        boost::asio::io_service io_service;
        boost::asio::io_service::work work(io_service);
        auto core_count = std::thread::hardware_concurrency();
        // io_service_pool.hpp and io_service_pool.cpp from boost's example
        io_service_pool pool(core_count);
        boost::asio::spawn(io_service, [&](boost::asio::yield_context yield)
        {
#define PORT "7788"
            tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), std::atoi(PORT)));
            for( ; ;) {
                boost::system::error_code ec;
                boost::asio::io_service& ios = pool.get_io_service();
                tcp::socket socket(ios);
                acceptor.async_accept(socket, yield[ec]);
                if(!ec)
                    ios.post(boost::bind(&session::go, std::make_shared<session>(std::move(socket))));
            }
        });
        { // run all io_service
            thread t([&] { pool.run(); });
            t.detach();
            io_service.run();
        }
    } catch(std::exception& e) {
        std::cerr << "Exception: " << e.what() << "n";
    }
    return 0;
}

现在服务器不再崩溃了。但是我仍然不知道如果我对所有线程使用单个io_service会导致崩溃。