正在将asio read_some转换为异步版本

Converting asio read_some to async version

本文关键字:转换 异步 版本 some asio read      更新时间:2023-10-16

我有以下代码,使用boost asio read_some函数从TCP套接字读取。目前代码是同步的,我需要将其转换为异步版本。最初的问题是读取一些字节以识别数据包类型并获得数据包的长度。然后我们有一个循环来读取数据。我需要使用两个回调来异步完成这项工作吗?或者可以使用一个回调来完成(这更可取)。

   void Transport::OnReadFromTcp()
    {
        int read = 0;
        // read 7 bytes from TCP into mTcpBuffer
        m_sslsock->read_some(asio::buffer(mTcpBuffer, 7));
        bool tag = true;
        for (unsigned char i = 0; i < 5; i++)
        {
            tag = tag && (mTcpBuffer[i] == g_TcpPacketTag[i]);
        }
        // get the length from the last two bytes
        unsigned short dataLen = (mTcpBuffer[5] ) | (mTcpBuffer[6] << 8);
        mBuff = new char[dataLen];
        int readTotal = 0;
        while (readTotal < dataLen)
        {
            // read lengths worth of data from tcp pipe into buffer
            int readlen = dataLen;
           size_t read = m_sslsock->read_some(asio::buffer(&mBuff[readTotal], readlen));
           readlen = dataLen - read;
            readTotal += read;
        }
        // Process data .....
    }

第一步是实现可以使用自由函数read:删除read_some循环

void Transport::OnReadFromTcp() {
    int read = 0;
    // read 7 bytes from TCP into mTcpBuffer
    size_t bytes = asio::read(*m_sslsock, asio::buffer(mTcpBuffer, 7), asio::transfer_all());
    assert(bytes == 7);
    bool tag = g_TcpPacketTag.end() == std::mismatch(
                    g_TcpPacketTag.begin(), g_TcpPacketTag.end(),
                    mTcpBuffer.begin(), mTcpBuffer.end())
                .first;
    // get the length from the last two bytes
    uint16_t const dataLen = mTcpBuffer[5] | (mTcpBuffer[6] << 8);
    mBuff.resize(dataLen);
    size_t readTotal = asio::read(*m_sslsock, asio::buffer(mBuff), asio::transfer_exactly(dataLen));
    assert(mBuff.size() == readTotal);
    assert(dataLen == readTotal);
}

这甚至与执行是否异步无关。

使它异步化稍微涉及一些,因为它需要对缓冲区/Transport实例的生存期以及潜在的多线程进行假设。我会在早上喝咖啡后提供一个样品:)


无线程/寿命并发症的演示:

在Coliru上直播

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <array>
#include <cassert>
namespace asio = boost::asio;
namespace ssl  = asio::ssl;

namespace {
    static std::array<char, 5> g_TcpPacketTag {{'A','B','C','D','E'}};
}
struct Transport {
    using tcp = asio::ip::tcp;
    using SslSocket = std::shared_ptr<asio::ssl::stream<tcp::socket> >;
    Transport(SslSocket s) : m_sslsock(s) { }
    void OnReadFromTcp();
    void OnHeaderReceived(boost::system::error_code ec, size_t transferred);
    void OnContentReceived(boost::system::error_code ec, size_t transferred);
  private:
    uint16_t datalen() const {
        return mTcpBuffer[5] | (mTcpBuffer[6] << 8);
    }
    SslSocket m_sslsock;
    std::array<char, 7> mTcpBuffer;
    std::vector<char> mBuff;
};
void Transport::OnReadFromTcp() {
    // read 7 bytes from TCP into mTcpBuffer
    asio::async_read(*m_sslsock, asio::buffer(mTcpBuffer, 7), asio::transfer_all(),
                boost::bind(&Transport::OnHeaderReceived, this, asio::placeholders::error, asio::placeholders::bytes_transferred)
            );
}
#include <boost/range/algorithm/mismatch.hpp> // I love sugar
void Transport::OnHeaderReceived(boost::system::error_code ec, size_t bytes) {
    if (ec) {
        std::cout << "Error: " << ec.message() << "n";
    }
    assert(bytes == 7);
    bool tag = (g_TcpPacketTag.end() == boost::mismatch(g_TcpPacketTag, mTcpBuffer).first);
    if (tag) {
        // get the length from the last two bytes
        mBuff.resize(datalen());
        asio::async_read(*m_sslsock, asio::buffer(mBuff), asio::transfer_exactly(datalen()),
                boost::bind(&Transport::OnContentReceived, this, asio::placeholders::error, asio::placeholders::bytes_transferred)
            );
    } else {
        std::cout << "TAG MISMATCHn"; // TODO handle error
    }
}
void Transport::OnContentReceived(boost::system::error_code ec, size_t readTotal) {
    assert(mBuff.size() == readTotal);
    assert(datalen() == readTotal);
    std::cout << "Successfully completed receive of " << datalen() << " bytesn";
}
int main() {
    asio::io_service svc;
    using Socket = Transport::SslSocket::element_type;
    // connect to localhost:6767 with SSL
    ssl::context ctx(ssl::context::sslv23);
    auto s = std::make_shared<Socket>(svc, ctx);
    s->lowest_layer().connect({ {}, 6767 });
    s->handshake(Socket::handshake_type::client);
    // do transport
    Transport tx(s);
    tx.OnReadFromTcp();
    svc.run();
    // all done
    std::cout << "All donen";
}

对端口6767上接受SSL连接的示例服务器使用时:

(printf "ABCDEx01x01F"; cat main.cpp) |
     openssl s_server -accept 6767 -cert so.crt -pass pass:test

打印:

Successfully completed receive of 257 bytes
All done