持续调用 boost::asio::read_async:无数据的回调

Continuous invocations of boost::asio::read_async: callback without data

本文关键字:async 数据 回调 read 调用 boost asio      更新时间:2023-10-16

我是 boost::asio 的新手,目前正试图构建一个简单的服务器应用程序,侦听 TCP 服务器。 客户端和服务器代码都派生自我在网上找到的示例(这工作正常)。
我自己的服务器不断调用async_read回调,没有任何数据。我已经搜索了几天,但没有得到示例和我的代码之间的区别:(
所以希望任何人都可以告诉我我做错了什么。

这是我的服务器应用程序代码:

#define DMOD DebugIds::TEST
#define DSUBID 0x0002
#define CLASSNAME "CLIENT_TEST"
#include <debug/dbg.h>
#include <memory>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
void doRead ( tcp::socket& socket )
{
    std::vector<char> buffer (0,1024);
    INF ("call async_read");
    boost::asio::async_read (
        socket,
        boost::asio::buffer(buffer.data(), buffer.size() ),
        [&socket](boost::system::error_code ec, std::size_t length)
        {
            INF ("async_read callback");
            if (!ec )
            {
                INF ( "accept %d bytes of data from client", length );
            }
            else
            {
                ERR ( "cannot read from client" );
            }
            INF ("call doRead");
            doRead ( socket );
        }
    );
}

int main(int argc, char* argv[])
{
    bool startClient = true;
    bool startServer = true;
    Debug::Instance()->setModuleLevel ( DebugIds::TEST, INFO );
    Debug::Instance()->setModuleLevel ( DebugIds::SERVER, DETAIL );
    Debug::Instance()->setModuleLevel ( DebugIds::CLIENT, DETAIL );
    if ( argc > 1 )
    {
        INF ( "argv[1]=%s", argv[1] );
        if ( strcmp ("s", argv[1]) == 0 )
        {
            startClient = false;
        }
        else if ( strcmp ("c", argv[1]) == 0 )
        {
            startServer = false;
        }
    }

    boost::thread serverThread;
    boost::thread clientThread;
    boost::mutex mutex;
    boost::condition_variable clientcond;
    unsigned int port = 12342;
    std::string _port = "12342";
    std::string adr = "127.0.0.1";
    INF ( "startServer=%d startClient=%d", startServer, startClient );
    INF ( "adr=%s port=%d _port=%s", adr.c_str(), port, _port.c_str() );
    boost::asio::io_service io_service_server;
    if ( startServer )
    {
        // start threads
        mutex.lock();
        INF ( "start server thread" );
        serverThread = boost::thread (
            [&]()  //lambda method
            {
                try
                {
                    INF ( "create TCP endpoint" );
                    boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address::from_string(adr), port );
                    //boost::asio::ip::tcp::endpoint ep(tcp::v4(), port );
                    INF ( "create TCP socket" );
                    tcp::socket m_socket(io_service_server);
                    INF ( "create TCP acceptor" );
                    tcp::acceptor m_acceptor(io_service_server,ep);
                    INF ( "call async_accept" );
                    m_acceptor.async_accept ( m_socket,
                        [&](boost::system::error_code ec)
                        {
                            INF ( "async_accept callback" );
                            if ( !ec )
                            {
                                INF ( "call doRead" );
                                doRead ( m_socket );
                            }
                            else
                            {
                                ERR ( "cannot accept client" );
                            }
                        }
                    );
                    INF ( "notify server is init" );
                    clientcond.notify_one();
                    INF ( "io_service_server.run()" );
                    io_service_server.run();
                }
                catch (std::exception& e)
                {
                    ERR ( "Exception: %s", e.what() );
                    return (0);
                }
                INF ( "End server application" );
                return (0);
            });
    }
    boost::asio::io_service io_service_client;
    if ( startClient )
    {
        clientThread = boost::thread (
            //[&mutex,&adr,&_port,&spC,&io_service_client]()
            [&]()
            {
                INF ( "wait for server to be initialized" );
                boost::unique_lock<boost::mutex> lock(mutex);
                clientcond.wait(lock);
                INF ( "server initialization finished" );
                try
                {
                    tcp::resolver resolver(io_service_client);
                    auto endpoint_iterator = resolver.resolve({ adr, _port });
                    INF ( "create and init testclient" );
                    #if 0
                    uint8_t count = 0;
                    std::vector<char> v(0,256);
                    for ( auto c: v )
                    {
                        c = ++count;
                    }
                    spC->write(v.data(), v.size());
                    #endif
                    io_service_client.run();
                }
                catch (std::exception& e)
                {
                    ERR ( "Exception: %s", e.what() );
                }
                INF ( "End client application" );
                return (0);
            });
    }
    char line[256];
    while ( std::cin.getline(line, 256) )
    {
        INF ( "got %s", line );
        if ( 0 == strcmp ("exit", line ) )
        {
            #if 0
            if (spC)
            {
                spC->close();
                io_service_client.stop();
                if (spC) clientThread.join();
                spC = std::shared_ptr<Client>();
            }
            if (spS)
            {
                spS->stop();
                io_service_server.stop();
                if (spC) serverThread.join();
                spS = std::shared_ptr<Server>();
            }
            #endif
            break;
        }
    }
    #if 0
    if (spC) clientThread.join();
    if (spS) serverThread.join();
    #endif
    return (0);
}

这将在控制台上创建以下输出:

[  0.000]     test[0x0002]:  INF:   SERVER_TEST::main():56:  argv[1]=s  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::main():78:  startServer=1 startClient=0  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::main():79:  adr=127.0.0.1 port=12342 _port=12342  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::main():86:  start server thread  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::operator()():92:  create TCP endpoint  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::operator()():95:  create TCP socket  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::operator()():97:  create TCP acceptor  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::operator()():100:  call async_accept  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::operator()():117:  notify server is init  
[  0.000]     test[0x0002]:  INF:   SERVER_TEST::operator()():119:  io_service_server.run()  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():104:  async_accept callback  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():107:  call doRead  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::doRead():23:  call async_read  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():29:  async_read callback  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():32:  accept 0 bytes of data from client  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():38:  call doRead  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::doRead():23:  call async_read  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():29:  async_read callback  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():32:  accept 0 bytes of data from client  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():38:  call doRead  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::doRead():23:  call async_read  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():29:  async_read callback  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():32:  accept 0 bytes of data from client  
[ 26.599]     test[0x0002]:  INF:   SERVER_TEST::operator()():38:  call doRead  
[...] a.s.o.

为了完整起见,我将添加我在 I-Net 中找到的示例代码(略有改动):示例客户端:此处async_read调用阻塞,直到数据从服务器到达。

//
// chat_client.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include "chat_message.hpp"
using boost::asio::ip::tcp;
#define DMOD DebugIds::TEST
#define DSUBID 0x0002
#define CLASSNAME "CLIENT_TEST"
#include <debug/dbg.h>
typedef std::deque<chat_message> chat_message_queue;
class chat_client
{
public:
    chat_client ( boost::asio::io_service& io_service,
                    tcp::resolver::iterator endpoint_iterator)
                    :   m_io_service(io_service),
                        m_socket(io_service)
    {
        INF ("");
        do_connect ( endpoint_iterator );
    }
    void write ( const chat_message& msg )
    {
        INF ( "post io service" );
        m_io_service.post(
                [this, msg]()
                {
                    bool write_in_progress = !m_write_msgs.empty();
                    m_write_msgs.push_back(msg);
                    if (!write_in_progress)
                    {
                        INF ( "call do_write" );
                        do_write();
                    }
                });
    }
    void close()
    {
        INF ( "close socket async" );
        m_io_service.post([this]() { INF ("close socket"); m_socket.close(); });
    }
private:
    void do_connect(tcp::resolver::iterator endpoint_iterator)
    {
        INF ( "async_connect" );
        boost::asio::async_connect ( m_socket, endpoint_iterator,
                [this](boost::system::error_code ec, tcp::resolver::iterator)
                {
                    INF ( "on async_connect" );
                    if (!ec)
                    {
                        INF ( "do_read_header" );
                        do_read_header();
                    }
                });
    }
    void do_read_header()
    {
        INF ( "call async_read" );
        boost::asio::async_read ( m_socket,
                                  boost::asio::buffer(m_read_msg.data(), chat_message::header_length),
                                  [this](boost::system::error_code ec, std::size_t /*length*/)
                                  {
                                      INF ( "on async_read" );
                                     if (!ec && m_read_msg.decode_header())
                                     {
                                         INF ( "call do_read_body" );
                                         do_read_body();
                                     }
                                     else
                                     {
                                         ERR ( "connect failed" );
                                         m_socket.close();
                                     }
                                  });
    }
    void do_read_body()
    {
        INF ( "call async_read" );
        boost::asio::async_read(m_socket,
                boost::asio::buffer(m_read_msg.body(), m_read_msg.body_length()),
                [this](boost::system::error_code ec, std::size_t /*length*/)
                {
                    INF ( "body: on async_read" );
                    if (!ec)
                    {
                        std::cout.write(m_read_msg.body(), m_read_msg.body_length());
                        std::cout << "n";
                        INF ( "body: call do_read_header" );
                        do_read_header();
                    }
                    else
                    {
                        INF ( "body: read failed" );
                        m_socket.close();
                    }
                });
    }
    void do_write()
    {
        INF ( "call async_write" );
        boost::asio::async_write(m_socket,
            boost::asio::buffer(m_write_msgs.front().data(),
                    m_write_msgs.front().length()),
                    [this] ( boost::system::error_code ec, std::size_t length )
                    {
                        INF ( "on async_write" );
                        if (!ec)
                        {
                            m_write_msgs.pop_front();
                            if (!m_write_msgs.empty())
                            {
                                INF ( "call do_write" );
                                do_write();
                            }
                            else
                            {
                            }
                        }
                        else
                        {
                            INF ( "async_write failed" );
                            m_socket.close();
                        }
                    });
    }
private:
    boost::asio::io_service& m_io_service;
    tcp::socket m_socket;
    chat_message m_read_msg;
    chat_message_queue m_write_msgs;
};
int main(int argc, char* argv[])
{
    Debug::Instance()->setModuleLevel ( DebugIds::TEST, INFO );
    Debug::Instance()->setModuleLevel ( DebugIds::SERVER, DETAIL );
    Debug::Instance()->setModuleLevel ( DebugIds::CLIENT, DETAIL );
    try
    {
        if (argc != 3)
        {
            INF ( "Usage: chat_client <host> <port>" );
            return(1);
        }
        INF ( "Create IO service" );
        boost::asio::io_service io_service;
        INF ( "Create TCP resolver" );
        tcp::resolver resolver(io_service);
        INF ( "Create endpoint iterator" );
        auto endpoint_iterator = resolver.resolve({ argv[1], argv[2] });
        INF ( "Create client" );
        chat_client c(io_service, endpoint_iterator);
        INF ( "Start thread that waits on io_service" );
        std::thread t([&io_service](){ io_service.run(); });
        // run io_service in an own thread, so we can still handle the
        // incomming data from commandline
        char line[chat_message::max_body_length + 1];
        while ( std::cin.getline(line, chat_message::max_body_length + 1) )
        {
            chat_message msg;
            msg.body_length(std::strlen(line));
            std::memcpy(msg.body(), line, msg.body_length());
            msg.encode_header();
            INF ( "write message %s", msg.data() );
            c.write(msg);
        }
        c.close();
        INF ( "Waits on io_service" );
        t.join();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "n";
    }
    return (0);
}

在这里,服务器:async_read调用阻塞,直到从客户端接收数据

//
// chat_server.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <utility>
#include <boost/asio.hpp>
#include "chat_message.hpp"
#define DMOD DebugIds::TEST
#define DSUBID 0x0002
#define CLASSNAME "SERVER_TEST"
#include <debug/dbg.h>
using boost::asio::ip::tcp;
#define DBG(X) std::cout << __FUNCTION__ << X << __LINE__ << std::endl;
//----------------------------------------------------------------------
typedef std::deque<chat_message> chat_message_queue;
//----------------------------------------------------------------------
class chat_participant
{
public:
  chat_participant() : m_id(++m_counter) {}
  virtual ~chat_participant() {}
  virtual void deliver(const chat_message& msg) = 0;
  static unsigned int m_counter;
  unsigned int m_id;
  unsigned int getId(){return (m_id);}
};
unsigned int chat_participant::m_counter = 0;
typedef std::shared_ptr<chat_participant> chat_participant_ptr;
//----------------------------------------------------------------------
class chat_room
{
public:
    void join(chat_participant_ptr participant)
    {
        m_participants.insert ( participant );
        for (   auto msg: m_recent_msgs )
        {
            INF ( "send available messages to session id=%d", participant->getId() );
            participant->deliver(msg);
        }
    }
    void leave(chat_participant_ptr participant)
    {
        INF ( "session id=%d", participant->getId() );
        m_participants.erase(participant);
    }
    void deliver(const chat_message& msg, unsigned int session_id )
    {
        INF ("deliver message to all participiants" );
        m_recent_msgs.push_back(msg);
        while (m_recent_msgs.size() > max_recent_msgs)
           m_recent_msgs.pop_front();
        for (auto participant: m_participants)
        {
            if(  session_id != participant->getId() )
            {
                INF ("   -> deliver message to p=%d", participant->getId() );
                participant->deliver(msg);
            }
        }
    }
private:
    std::set<chat_participant_ptr> m_participants;
    enum { max_recent_msgs = 100 };
    chat_message_queue m_recent_msgs;
};
//----------------------------------------------------------------------
class chat_session
  : public chat_participant,
    public std::enable_shared_from_this<chat_session>
{
public:
    chat_session(tcp::socket socket, chat_room& room)
        :   m_socket ( std::move(socket) ),
            m_room ( room )
    {
        INF ( "id=%d", getId() );
    }
    void start()
    {
        INF ( "id=%d", getId() );
        m_room.join ( shared_from_this() );
        do_read_header();
    }
    void deliver(const chat_message& msg)
    {
        INF ( "id=%d", getId() );
        bool write_in_progress = !m_write_msgs.empty();
        m_write_msgs.push_back(msg);
        if (!write_in_progress)
        {
            do_write();
        }
    }
private:
    void do_read_header()
    {
        auto self(shared_from_this());
        INF ( "id=%d call async_read", getId() );
        boost::asio::async_read(m_socket,
        boost::asio::buffer(m_read_msg.data(), chat_message::header_length),
            [this, self](boost::system::error_code ec, std::size_t /*length*/)
            {
                INF ( "id=%d    read callback!", getId() );
                if (!ec )
                {
                    INF ( "wait for next message to receive" );
                    do_read_header();
                }
                else
                {
                    m_room.leave(shared_from_this());
                }
            });
    }
    void do_write()
    {
        /**
         * auto self(shared_from_this()); in combination with the function pointer
         *    [this, self](boost::system::error_code ec, std::size_t length )
         *  ensures, that the chat_session is alive, as long as the asynchronous
         *  operation is ongoing
         */
        auto self(shared_from_this());
        INF ( "id=%d  start async_write", getId() );
        boost::asio::async_write(m_socket,
            boost::asio::buffer(m_write_msgs.front().data(),
              m_write_msgs.front().length()),
                [this, self](boost::system::error_code ec, std::size_t /*length*/)
                {
                    INF ( "id=%d  write callback", getId() );
                    if (!ec)
                    {
                        m_write_msgs.pop_front();
                        if (!m_write_msgs.empty())
                        {
                            INF ( "messages avaliable -> call do_write" );
                            do_write();
                        }
                    }
                    else
                    {
                        INF ( "error on async write" );
                        m_room.leave(shared_from_this());
                    }
                });
    }
    tcp::socket m_socket;
    chat_room& m_room;
    chat_message m_read_msg;
    chat_message_queue m_write_msgs;
};
//----------------------------------------------------------------------
class chat_server
{
public:
    chat_server ( boost::asio::io_service& io_service,
                    const tcp::endpoint& endpoint)
    :   m_acceptor ( io_service, endpoint ),
        m_socket ( io_service ),
        m_room () // call for m_room is not explicitly necessary, but
                  // makes the code more readable
    {
        INF ( "port=%d", endpoint.port() );
        do_accept();
    }
private:
    void do_accept()
    {
        INF ("   called");
        m_acceptor.async_accept ( m_socket,
        [this](boost::system::error_code ec)
        {
            if (!ec)
            {
                // std::make_shared<T>(args)
                // creates a shared pointer of T and provides the arguments to
                // the new object!
                INF ( "   clients connection: start a new chat session" );
                std::make_shared<chat_session> ( std::move ( m_socket ), m_room )->start();
                // http://stackoverflow.com/questions/3413470/what-is-stdmove-and-when-should-it-be-used
                // move allows to swap the ressources instead of copying them around!
            }
            INF ( "wait for the next client to connect" );
            do_accept();
        });
    }
    tcp::acceptor m_acceptor;
    tcp::socket m_socket;
    chat_room m_room;
};
//----------------------------------------------------------------------
int main(int argc, char* argv[])
{
    Debug::Instance()->setModuleLevel ( DebugIds::TEST, INFO );
    Debug::Instance()->setModuleLevel ( DebugIds::SERVER, DETAIL );
    Debug::Instance()->setModuleLevel ( DebugIds::CLIENT, DETAIL );
    try
    {
        if (argc < 2)
        {
            std::cerr << "Usage: chat_server <port> [<port> ...]n";
            return (1);
        }
        INF ( "argc=", argc );
        INF ("define io_service");
        boost::asio::io_service io_service;
        std::list<chat_server> servers;
        for (int i = 1; i < argc; ++i)
        {
            INF ("define endpoint");
            tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i]));
            // create new chat_server by calling its constructor and add
            // it to the list. emplace_back takes care off the correct
            // constructor selection
            INF ( "Start chat server at %s:%d", endpoint.address().to_string().c_str(), endpoint.port() );
            servers.emplace_back(io_service, endpoint);
        }
        std::cout << "io_service.stopped()=" << io_service.stopped() << std::endl;
        INF ("io_service.run()");
        boost::system::error_code ec;
        int size = io_service.run(ec);
        INF ( "io_service.run() [%d] returned with %s", size, ec.message().c_str() );
    }
    catch (std::exception& e)
    {
        ERR("Exception: %s", e.what());
    }
    INF ( "End server application" );
    return (0);
}

该错误是在创建此帖子 5 分钟后发现的。

我应该更详细地要求提升::asio 规范:

异步操作将继续,直到满足以下条件之一:
- 提供的缓冲区已满。也就是说,传输的字节数等于缓冲区大小的总和。
- 发生错误。

因此,在我更正了数据缓冲区初始化中的错误后,一切正常: 标准::矢量缓冲区 (0,1024); -> 标准::矢量缓冲区 (1024,0);

std::vector 缓冲区(在 doRead 中)在数据接收之前被销毁。

来自提升 asio 缓冲区文档:

调用异步读取或写入时,需要确保 操作的缓冲区在完成处理程序之前有效 被称为。在上面的示例中,缓冲区是 std::string 可变味精。这个变量在堆栈上,所以它从 异步操作完成之前的作用域。如果你幸运的话 然后应用程序将崩溃,但更有可能出现随机故障。

您可以在类中传输缓冲区,也可以使用shared_ptr机制来确保缓冲区仅在使用后销毁。例如像这样:

 void do_read()
    {
        std::shared_ptr<std::vector<char>> buf = std::make_shared<std::vector<char>>(1024, 0);
        m_socket.async_read_some
        (
            boost::asio::buffer(*buf, 1024),
            // copy buf. The buffer will be destroyed 
            // only after leaving the lambda function
            [buf](boost::system::error_code err, std::size_t length)
            {
                if(!err)
                {
                    // use buf
                }
                else
                {
                    // abort
                }
                do_read();
            }
        );
    }