持续调用 boost::asio::read_async:无数据的回调
Continuous invocations of boost::asio::read_async: callback without data
我是 boost::asio 的新手,目前正试图构建一个简单的服务器应用程序,侦听 TCP 服务器。 客户端和服务器代码都派生自我在网上找到的示例(这工作正常)。
我自己的服务器不断调用async_read回调,没有任何数据。我已经搜索了几天,但没有得到示例和我的代码之间的区别:(
所以希望任何人都可以告诉我我做错了什么。
这是我的服务器应用程序代码:
#define DMOD DebugIds::TEST
#define DSUBID 0x0002
#define CLASSNAME "CLIENT_TEST"
#include <debug/dbg.h>
#include <memory>
#include <boost/thread.hpp>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
void doRead ( tcp::socket& socket )
{
std::vector<char> buffer (0,1024);
INF ("call async_read");
boost::asio::async_read (
socket,
boost::asio::buffer(buffer.data(), buffer.size() ),
[&socket](boost::system::error_code ec, std::size_t length)
{
INF ("async_read callback");
if (!ec )
{
INF ( "accept %d bytes of data from client", length );
}
else
{
ERR ( "cannot read from client" );
}
INF ("call doRead");
doRead ( socket );
}
);
}
int main(int argc, char* argv[])
{
bool startClient = true;
bool startServer = true;
Debug::Instance()->setModuleLevel ( DebugIds::TEST, INFO );
Debug::Instance()->setModuleLevel ( DebugIds::SERVER, DETAIL );
Debug::Instance()->setModuleLevel ( DebugIds::CLIENT, DETAIL );
if ( argc > 1 )
{
INF ( "argv[1]=%s", argv[1] );
if ( strcmp ("s", argv[1]) == 0 )
{
startClient = false;
}
else if ( strcmp ("c", argv[1]) == 0 )
{
startServer = false;
}
}
boost::thread serverThread;
boost::thread clientThread;
boost::mutex mutex;
boost::condition_variable clientcond;
unsigned int port = 12342;
std::string _port = "12342";
std::string adr = "127.0.0.1";
INF ( "startServer=%d startClient=%d", startServer, startClient );
INF ( "adr=%s port=%d _port=%s", adr.c_str(), port, _port.c_str() );
boost::asio::io_service io_service_server;
if ( startServer )
{
// start threads
mutex.lock();
INF ( "start server thread" );
serverThread = boost::thread (
[&]() //lambda method
{
try
{
INF ( "create TCP endpoint" );
boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address::from_string(adr), port );
//boost::asio::ip::tcp::endpoint ep(tcp::v4(), port );
INF ( "create TCP socket" );
tcp::socket m_socket(io_service_server);
INF ( "create TCP acceptor" );
tcp::acceptor m_acceptor(io_service_server,ep);
INF ( "call async_accept" );
m_acceptor.async_accept ( m_socket,
[&](boost::system::error_code ec)
{
INF ( "async_accept callback" );
if ( !ec )
{
INF ( "call doRead" );
doRead ( m_socket );
}
else
{
ERR ( "cannot accept client" );
}
}
);
INF ( "notify server is init" );
clientcond.notify_one();
INF ( "io_service_server.run()" );
io_service_server.run();
}
catch (std::exception& e)
{
ERR ( "Exception: %s", e.what() );
return (0);
}
INF ( "End server application" );
return (0);
});
}
boost::asio::io_service io_service_client;
if ( startClient )
{
clientThread = boost::thread (
//[&mutex,&adr,&_port,&spC,&io_service_client]()
[&]()
{
INF ( "wait for server to be initialized" );
boost::unique_lock<boost::mutex> lock(mutex);
clientcond.wait(lock);
INF ( "server initialization finished" );
try
{
tcp::resolver resolver(io_service_client);
auto endpoint_iterator = resolver.resolve({ adr, _port });
INF ( "create and init testclient" );
#if 0
uint8_t count = 0;
std::vector<char> v(0,256);
for ( auto c: v )
{
c = ++count;
}
spC->write(v.data(), v.size());
#endif
io_service_client.run();
}
catch (std::exception& e)
{
ERR ( "Exception: %s", e.what() );
}
INF ( "End client application" );
return (0);
});
}
char line[256];
while ( std::cin.getline(line, 256) )
{
INF ( "got %s", line );
if ( 0 == strcmp ("exit", line ) )
{
#if 0
if (spC)
{
spC->close();
io_service_client.stop();
if (spC) clientThread.join();
spC = std::shared_ptr<Client>();
}
if (spS)
{
spS->stop();
io_service_server.stop();
if (spC) serverThread.join();
spS = std::shared_ptr<Server>();
}
#endif
break;
}
}
#if 0
if (spC) clientThread.join();
if (spS) serverThread.join();
#endif
return (0);
}
这将在控制台上创建以下输出:
[ 0.000] test[0x0002]: INF: SERVER_TEST::main():56: argv[1]=s
[ 0.000] test[0x0002]: INF: SERVER_TEST::main():78: startServer=1 startClient=0
[ 0.000] test[0x0002]: INF: SERVER_TEST::main():79: adr=127.0.0.1 port=12342 _port=12342
[ 0.000] test[0x0002]: INF: SERVER_TEST::main():86: start server thread
[ 0.000] test[0x0002]: INF: SERVER_TEST::operator()():92: create TCP endpoint
[ 0.000] test[0x0002]: INF: SERVER_TEST::operator()():95: create TCP socket
[ 0.000] test[0x0002]: INF: SERVER_TEST::operator()():97: create TCP acceptor
[ 0.000] test[0x0002]: INF: SERVER_TEST::operator()():100: call async_accept
[ 0.000] test[0x0002]: INF: SERVER_TEST::operator()():117: notify server is init
[ 0.000] test[0x0002]: INF: SERVER_TEST::operator()():119: io_service_server.run()
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():104: async_accept callback
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():107: call doRead
[ 26.599] test[0x0002]: INF: SERVER_TEST::doRead():23: call async_read
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():29: async_read callback
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():32: accept 0 bytes of data from client
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():38: call doRead
[ 26.599] test[0x0002]: INF: SERVER_TEST::doRead():23: call async_read
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():29: async_read callback
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():32: accept 0 bytes of data from client
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():38: call doRead
[ 26.599] test[0x0002]: INF: SERVER_TEST::doRead():23: call async_read
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():29: async_read callback
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():32: accept 0 bytes of data from client
[ 26.599] test[0x0002]: INF: SERVER_TEST::operator()():38: call doRead
[...] a.s.o.
为了完整起见,我将添加我在 I-Net 中找到的示例代码(略有改动):示例客户端:此处async_read调用阻塞,直到数据从服务器到达。
//
// chat_client.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <deque>
#include <iostream>
#include <thread>
#include <boost/asio.hpp>
#include "chat_message.hpp"
using boost::asio::ip::tcp;
#define DMOD DebugIds::TEST
#define DSUBID 0x0002
#define CLASSNAME "CLIENT_TEST"
#include <debug/dbg.h>
typedef std::deque<chat_message> chat_message_queue;
class chat_client
{
public:
chat_client ( boost::asio::io_service& io_service,
tcp::resolver::iterator endpoint_iterator)
: m_io_service(io_service),
m_socket(io_service)
{
INF ("");
do_connect ( endpoint_iterator );
}
void write ( const chat_message& msg )
{
INF ( "post io service" );
m_io_service.post(
[this, msg]()
{
bool write_in_progress = !m_write_msgs.empty();
m_write_msgs.push_back(msg);
if (!write_in_progress)
{
INF ( "call do_write" );
do_write();
}
});
}
void close()
{
INF ( "close socket async" );
m_io_service.post([this]() { INF ("close socket"); m_socket.close(); });
}
private:
void do_connect(tcp::resolver::iterator endpoint_iterator)
{
INF ( "async_connect" );
boost::asio::async_connect ( m_socket, endpoint_iterator,
[this](boost::system::error_code ec, tcp::resolver::iterator)
{
INF ( "on async_connect" );
if (!ec)
{
INF ( "do_read_header" );
do_read_header();
}
});
}
void do_read_header()
{
INF ( "call async_read" );
boost::asio::async_read ( m_socket,
boost::asio::buffer(m_read_msg.data(), chat_message::header_length),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
INF ( "on async_read" );
if (!ec && m_read_msg.decode_header())
{
INF ( "call do_read_body" );
do_read_body();
}
else
{
ERR ( "connect failed" );
m_socket.close();
}
});
}
void do_read_body()
{
INF ( "call async_read" );
boost::asio::async_read(m_socket,
boost::asio::buffer(m_read_msg.body(), m_read_msg.body_length()),
[this](boost::system::error_code ec, std::size_t /*length*/)
{
INF ( "body: on async_read" );
if (!ec)
{
std::cout.write(m_read_msg.body(), m_read_msg.body_length());
std::cout << "n";
INF ( "body: call do_read_header" );
do_read_header();
}
else
{
INF ( "body: read failed" );
m_socket.close();
}
});
}
void do_write()
{
INF ( "call async_write" );
boost::asio::async_write(m_socket,
boost::asio::buffer(m_write_msgs.front().data(),
m_write_msgs.front().length()),
[this] ( boost::system::error_code ec, std::size_t length )
{
INF ( "on async_write" );
if (!ec)
{
m_write_msgs.pop_front();
if (!m_write_msgs.empty())
{
INF ( "call do_write" );
do_write();
}
else
{
}
}
else
{
INF ( "async_write failed" );
m_socket.close();
}
});
}
private:
boost::asio::io_service& m_io_service;
tcp::socket m_socket;
chat_message m_read_msg;
chat_message_queue m_write_msgs;
};
int main(int argc, char* argv[])
{
Debug::Instance()->setModuleLevel ( DebugIds::TEST, INFO );
Debug::Instance()->setModuleLevel ( DebugIds::SERVER, DETAIL );
Debug::Instance()->setModuleLevel ( DebugIds::CLIENT, DETAIL );
try
{
if (argc != 3)
{
INF ( "Usage: chat_client <host> <port>" );
return(1);
}
INF ( "Create IO service" );
boost::asio::io_service io_service;
INF ( "Create TCP resolver" );
tcp::resolver resolver(io_service);
INF ( "Create endpoint iterator" );
auto endpoint_iterator = resolver.resolve({ argv[1], argv[2] });
INF ( "Create client" );
chat_client c(io_service, endpoint_iterator);
INF ( "Start thread that waits on io_service" );
std::thread t([&io_service](){ io_service.run(); });
// run io_service in an own thread, so we can still handle the
// incomming data from commandline
char line[chat_message::max_body_length + 1];
while ( std::cin.getline(line, chat_message::max_body_length + 1) )
{
chat_message msg;
msg.body_length(std::strlen(line));
std::memcpy(msg.body(), line, msg.body_length());
msg.encode_header();
INF ( "write message %s", msg.data() );
c.write(msg);
}
c.close();
INF ( "Waits on io_service" );
t.join();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "n";
}
return (0);
}
在这里,服务器:async_read调用阻塞,直到从客户端接收数据
//
// chat_server.cpp
// ~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <deque>
#include <iostream>
#include <list>
#include <memory>
#include <set>
#include <utility>
#include <boost/asio.hpp>
#include "chat_message.hpp"
#define DMOD DebugIds::TEST
#define DSUBID 0x0002
#define CLASSNAME "SERVER_TEST"
#include <debug/dbg.h>
using boost::asio::ip::tcp;
#define DBG(X) std::cout << __FUNCTION__ << X << __LINE__ << std::endl;
//----------------------------------------------------------------------
typedef std::deque<chat_message> chat_message_queue;
//----------------------------------------------------------------------
class chat_participant
{
public:
chat_participant() : m_id(++m_counter) {}
virtual ~chat_participant() {}
virtual void deliver(const chat_message& msg) = 0;
static unsigned int m_counter;
unsigned int m_id;
unsigned int getId(){return (m_id);}
};
unsigned int chat_participant::m_counter = 0;
typedef std::shared_ptr<chat_participant> chat_participant_ptr;
//----------------------------------------------------------------------
class chat_room
{
public:
void join(chat_participant_ptr participant)
{
m_participants.insert ( participant );
for ( auto msg: m_recent_msgs )
{
INF ( "send available messages to session id=%d", participant->getId() );
participant->deliver(msg);
}
}
void leave(chat_participant_ptr participant)
{
INF ( "session id=%d", participant->getId() );
m_participants.erase(participant);
}
void deliver(const chat_message& msg, unsigned int session_id )
{
INF ("deliver message to all participiants" );
m_recent_msgs.push_back(msg);
while (m_recent_msgs.size() > max_recent_msgs)
m_recent_msgs.pop_front();
for (auto participant: m_participants)
{
if( session_id != participant->getId() )
{
INF (" -> deliver message to p=%d", participant->getId() );
participant->deliver(msg);
}
}
}
private:
std::set<chat_participant_ptr> m_participants;
enum { max_recent_msgs = 100 };
chat_message_queue m_recent_msgs;
};
//----------------------------------------------------------------------
class chat_session
: public chat_participant,
public std::enable_shared_from_this<chat_session>
{
public:
chat_session(tcp::socket socket, chat_room& room)
: m_socket ( std::move(socket) ),
m_room ( room )
{
INF ( "id=%d", getId() );
}
void start()
{
INF ( "id=%d", getId() );
m_room.join ( shared_from_this() );
do_read_header();
}
void deliver(const chat_message& msg)
{
INF ( "id=%d", getId() );
bool write_in_progress = !m_write_msgs.empty();
m_write_msgs.push_back(msg);
if (!write_in_progress)
{
do_write();
}
}
private:
void do_read_header()
{
auto self(shared_from_this());
INF ( "id=%d call async_read", getId() );
boost::asio::async_read(m_socket,
boost::asio::buffer(m_read_msg.data(), chat_message::header_length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
INF ( "id=%d read callback!", getId() );
if (!ec )
{
INF ( "wait for next message to receive" );
do_read_header();
}
else
{
m_room.leave(shared_from_this());
}
});
}
void do_write()
{
/**
* auto self(shared_from_this()); in combination with the function pointer
* [this, self](boost::system::error_code ec, std::size_t length )
* ensures, that the chat_session is alive, as long as the asynchronous
* operation is ongoing
*/
auto self(shared_from_this());
INF ( "id=%d start async_write", getId() );
boost::asio::async_write(m_socket,
boost::asio::buffer(m_write_msgs.front().data(),
m_write_msgs.front().length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
INF ( "id=%d write callback", getId() );
if (!ec)
{
m_write_msgs.pop_front();
if (!m_write_msgs.empty())
{
INF ( "messages avaliable -> call do_write" );
do_write();
}
}
else
{
INF ( "error on async write" );
m_room.leave(shared_from_this());
}
});
}
tcp::socket m_socket;
chat_room& m_room;
chat_message m_read_msg;
chat_message_queue m_write_msgs;
};
//----------------------------------------------------------------------
class chat_server
{
public:
chat_server ( boost::asio::io_service& io_service,
const tcp::endpoint& endpoint)
: m_acceptor ( io_service, endpoint ),
m_socket ( io_service ),
m_room () // call for m_room is not explicitly necessary, but
// makes the code more readable
{
INF ( "port=%d", endpoint.port() );
do_accept();
}
private:
void do_accept()
{
INF (" called");
m_acceptor.async_accept ( m_socket,
[this](boost::system::error_code ec)
{
if (!ec)
{
// std::make_shared<T>(args)
// creates a shared pointer of T and provides the arguments to
// the new object!
INF ( " clients connection: start a new chat session" );
std::make_shared<chat_session> ( std::move ( m_socket ), m_room )->start();
// http://stackoverflow.com/questions/3413470/what-is-stdmove-and-when-should-it-be-used
// move allows to swap the ressources instead of copying them around!
}
INF ( "wait for the next client to connect" );
do_accept();
});
}
tcp::acceptor m_acceptor;
tcp::socket m_socket;
chat_room m_room;
};
//----------------------------------------------------------------------
int main(int argc, char* argv[])
{
Debug::Instance()->setModuleLevel ( DebugIds::TEST, INFO );
Debug::Instance()->setModuleLevel ( DebugIds::SERVER, DETAIL );
Debug::Instance()->setModuleLevel ( DebugIds::CLIENT, DETAIL );
try
{
if (argc < 2)
{
std::cerr << "Usage: chat_server <port> [<port> ...]n";
return (1);
}
INF ( "argc=", argc );
INF ("define io_service");
boost::asio::io_service io_service;
std::list<chat_server> servers;
for (int i = 1; i < argc; ++i)
{
INF ("define endpoint");
tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i]));
// create new chat_server by calling its constructor and add
// it to the list. emplace_back takes care off the correct
// constructor selection
INF ( "Start chat server at %s:%d", endpoint.address().to_string().c_str(), endpoint.port() );
servers.emplace_back(io_service, endpoint);
}
std::cout << "io_service.stopped()=" << io_service.stopped() << std::endl;
INF ("io_service.run()");
boost::system::error_code ec;
int size = io_service.run(ec);
INF ( "io_service.run() [%d] returned with %s", size, ec.message().c_str() );
}
catch (std::exception& e)
{
ERR("Exception: %s", e.what());
}
INF ( "End server application" );
return (0);
}
该错误是在创建此帖子 5 分钟后发现的。
我应该更详细地要求提升::asio 规范:
异步操作将继续,直到满足以下条件之一:
- 提供的缓冲区已满。也就是说,传输的字节数等于缓冲区大小的总和。
- 发生错误。
因此,在我更正了数据缓冲区初始化中的错误后,一切正常: 标准::矢量缓冲区 (0,1024); -> 标准::矢量缓冲区 (1024,0);
std::vector 缓冲区(在 doRead 中)在数据接收之前被销毁。
来自提升 asio 缓冲区文档:
调用异步读取或写入时,需要确保 操作的缓冲区在完成处理程序之前有效 被称为。在上面的示例中,缓冲区是 std::string 可变味精。这个变量在堆栈上,所以它从 异步操作完成之前的作用域。如果你幸运的话 然后应用程序将崩溃,但更有可能出现随机故障。
您可以在类中传输缓冲区,也可以使用shared_ptr机制来确保缓冲区仅在使用后销毁。例如像这样:
void do_read()
{
std::shared_ptr<std::vector<char>> buf = std::make_shared<std::vector<char>>(1024, 0);
m_socket.async_read_some
(
boost::asio::buffer(*buf, 1024),
// copy buf. The buffer will be destroyed
// only after leaving the lambda function
[buf](boost::system::error_code err, std::size_t length)
{
if(!err)
{
// use buf
}
else
{
// abort
}
do_read();
}
);
}
- 防止主数据类型C++的隐式转换
- 用于访问容器<T>数据成员的正确 API
- 嵌套在类中时无法设置成员数据
- 使用流处理接收到的数据
- 静态数据成员的问题-修复链接错误会导致编译器错误
- 处理小于cpu数据总线的数据类型.(c++转换为机器代码)
- 在cuda线程之间共享大量常量数据
- C++将文本文件中的数据读取到结构数组中
- 如何在C++中序列化结构数据
- 在C++中打印指向不同基元数据类型的指针的内存地址
- 通过套接字[TCP]传输数据 如何在C / C ++中打包多个整数并使用send() recv()传输数据
- 在c代码之间共享数据的最佳方式
- 链表,反向函数,数据结构
- 数据成员SFINAE的C++17测试:gcc vs clang
- C++浮点数据类型和字符串数据类型无法子到模板函数中
- 如何对点云数据进行排序
- 从矢量<无符号字符>转换为字符* 包括垃圾数据
- 尝试通过OCI例程从Oracle获取blob数据,但出现错误:ORA-01008:并非所有变量都绑定
- Cuda C++:设备上的Malloc类,并用来自主机的数据填充它
- 是什么导致了 std::async 中的数据竞争