BOOST :: BEAST:带Websocket管道的服务器

Boost::Beast : server with websocket pipelining

本文关键字:服务器 管道 BEAST BOOST Websocket      更新时间:2023-10-16

我正在编写具有Boost Beast 1.70和MySQL 8 C连接器的C Websocket服务器。服务器将同时连接多个客户端。特殊性是,每个客户端将在服务器中连续执行100个Websocket请求。每个请求都是我的服务器的" CPU LIGHT",但服务器为每个请求执行" Time Regue" SQL请求。

我已经使用websocket_server_coro.cpp示例启动了我的服务器。服务器步骤是:

1(websocket读取

2(SQL请求

3(Websocket Write

问题在于,对于给定的用户,服务器在步骤2处被"锁定",直到此步骤3和步骤3完成。因此,依次解决了100个请求。对于我的用例,这太慢了。

我已经读到,Boost Beast不可能进行非阻止读/写。但是,我现在要做的是在coroutine中执行Async_read和async_write。

void ServerCoro::accept(websocket::stream<beast::tcp_stream> &ws) {
    beast::error_code ec;
    ws.set_option(websocket::stream_base::timeout::suggested(beast::role_type::server));
    ws.set_option(websocket::stream_base::decorator([](websocket::response_type &res) {
                res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-Server-coro");
            }));
    ws.async_accept(yield[ec]);
    if (ec) return fail(ec, "accept");
    while (!_bStop) {
        beast::flat_buffer buffer;
        ws.async_read(buffer, yield[ec]);
        if (ec == websocket::error::closed) {
            std::cout << "=> get closed" << std::endl;
            return;
        }
        if (ec) return fail(ec, "read");
        auto buffer_str = new std::string(boost::beast::buffers_to_string(buffer.cdata()));
        net::post([&, buffer_str] {
            // sql async request such as :
            // while (status == (mysql_real_query_nonblocking(this->con, sqlRequest.c_str(), sqlRequest.size()))) {
            //    ioc.poll_one(ec);
            // }
            // more sql ...
            ws.async_write(net::buffer(worker->getResponse()), yield[ec]); // this line is throwing void boost::coroutines::detail::pull_coroutine_impl<void>::pull(): Assertion `! is_running()' failed.
            if (ec) return fail(ec, "write");
        });
    }
}

问题是与async_write的行抛出了一个错误:

void boost :: coroutines ::详细:: pull_coroutine_impl :: pull((:断言`!is_running(('失败。

如果a用sync_write替换此行,则可以使用它,但服务器仍然是给定用户的顺序。我试图在单个螺纹服务器上执行此代码。我还试图将相同的链用于async_read和async_write。仍然有断言错误。

对于Websocket的Boost Beast,这是不可能的服务器吗?谢谢。

通过遵循Vinnie Falco的建议,我通过使用" WebSocket Chat"answers" Async Server"来重写代码。这是代码的最终工作结果:

void Session::on_read(beast::error_code ec, std::size_t bytes_transferred)
{
    boost::ignore_unused(bytes_transferred);
    if(ec == websocket::error::closed) return;  // This indicates that the Session was closed
    if(ec) return fail(ec, "read");
    net::post([&, that = shared_from_this(), ss = std::make_shared<std::string const>(std::move(boost::beast::buffers_to_string(_buffer.cdata())))] {
        /* Sql things that call ioc.poll_one(ec) HERE, for me the sql response go inside worker.getResponse() used below */
        net::dispatch(_wsStrand, [&, that = shared_from_this(), sss = std::make_shared < std::string const>(worker.getResponse())] {
            async_write(sss);
        });
    });
    _buffer.consume(_buffer.size()); // we remove from the buffer what we just read
    do_read(); // go for another read
}
void Session::async_write(const std::shared_ptr<std::string const> &message) {
    _writeMessages.push_back(message);
    if (_writeMessages.size() > 1) {
        BOOST_LOG_TRIVIAL(warning) << "WRITE IS LOCKED";
    } else {
        _ws.text(_ws.got_text());
            _ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
                    &Session::on_write, this)));
    }
}
void Session::on_write(beast::error_code ec, std::size_t)
{
    // Handle the error, if any
    if(ec) return fail(ec, "write");
    // Remove the string from the queue
    _writeMessages.erase(_writeMessages.begin());
    // Send the next message if any
    if(!_writeMessages.empty())
        _ws.async_write(net::buffer(*_writeMessages.front()), boost::asio::bind_executor(_wsStrand, beast::bind_front_handler(
                        &Session::on_write, this)));
}

谢谢。