boost::asio 和多个客户端连接使用异步
boost::asio and multiple client connections using asynch
我需要与不同的服务器建立最多三个不同的TCP连接。所有三个连接都需要不同的协议、不同的握手和不同的心跳。学习 http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/example/cpp11/chat/chat_client.cpp,在这里阅读东西并遵循Chris Kohlhoff的建议,我试图实现如下。
问题是,使用这种架构,无论我在做什么,我在doConnect((中调用shared_from_this((时都会遇到bad_weak_pointer异常。
导入 这些只是未运行代码的片段,可能包含错误!进口
我有一个包含一些基本方法的基类。
连接.h
class Connection : public std::enable_shared_from_this<Connection>
{
public:
//! Ctor
inline Connection();
//! Dtor
inline virtual ~Connection();
inline void setReconnectTime(const long &reconnectAfterMilisec)
{
m_reconnectTime = boost::posix_time::milliseconds(reconnectAfterMilisec);
}
inline void setHandshakePeriod(const long &periodInMilisec)
{
m_handshakePeriod = boost::posix_time::milliseconds(periodInMilisec);
}
virtual void doConnect() = 0;
virtual void stop() = 0;
//... and some view more...
}
然后,我有三个类,它们派生自基类。这里只有一个(也是核心部分(来描述这种方法。
连接A.h
//queues which containing also the age of the messages
typedef std::deque<std::pair<handshakeMsg, boost::posix_time::ptime>> handskMsg_queue;
typedef std::deque<std::pair<errorcodeMsg, boost::posix_time::ptime>> ecMsg_queue;
typedef std::deque<std::pair<A_Msg, boost::posix_time::ptime>> A_Msg_queue;
class ConnectionA : public Connection
{
public:
ConnectionA();
ConnectionA(const std::string& IP, const int &port);
ConnectionA& operator=(const ConnectionA &other);
virtual ~ConnectionA();
virtual void stop() override;
virtual void doConnect() override;
void doPost(std::string &message);
void doHandshake();
void sendErrorCode(const int &ec);
std::shared_ptr<boost::asio::io_service>m_ioS;
private:
std::shared_ptr<tcp::socket> m_socket;
std::shared_ptr<boost::asio::deadline_timer> m_deadlineTimer; // for reconnetions
std::shared_ptr<boost::asio::deadline_timer> m_handshakeTimer; // for heartbeats
void deadlineTimer_handler(const boost::system::error_code& error);
void handshakeTimer_handler(const boost::system::error_code& error);
void doRead();
void doWrite();
std::string m_IP;
int m_port;
handskMsg_queue m_handskMsgQueue;
ecMsg_queue m_ecMsgQueue;
A_Msg_queue m_AMsgQueue;
}
连接A.cpp
ConnectionA::ConnectionA(const std::string &IP, const int &port)
: m_ioS()
, m_socket()
, m_deadlineTimer()
, m_handshakeTimer()
, m_IP(IP)
, m_port(port)
, m_handskMsgQueue(10)
, m_ecMsgQueue(10)
, m_AMsgQueue(10)
{
m_ioS = std::make_shared<boost::asio::io_service>();
m_socket = std::make_shared<tcp::socket>(*m_ioS);
m_deadlineTimer = std::make_shared<boost::asio::deadline_timer>(*m_ioS);
m_handshakeTimer = std::make_shared<boost::asio::deadline_timer> (*m_ioS);
m_deadlineTimer->async_wait(boost::bind(&ConnectionA::deadlineTimer_handler, this, boost::asio::placeholders::error));
m_handshakeTimer->async_wait(boost::bind(&ConnectionA::handshakeTimer_handler, this, boost::asio::placeholders::error));
}
ConnectionA::~ConnectionA()
{}
void ConnectionA::stop()
{
m_ioS->post([this]() { m_socket->close(); });
m_deadlineTimer->cancel();
m_handshakeTimer->cancel();
}
void ConnectionA::doConnect()
{
if (m_socket->is_open()){
return;
}
tcp::resolver resolver(*m_ioS);
std::string portAsString = std::to_string(m_port);
auto endpoint_iter = resolver.resolve({ m_IP.c_str(), portAsString.c_str() });
m_deadlineTimer->expires_from_now(m_reconnectTime);
// this gives me a bad_weak_pointer exception!!!
auto self = std::static_pointer_cast<ConnectionA>(static_cast<ConnectionA*>(this)->shared_from_this());
boost::asio::async_connect(*m_socket, endpoint_iter, [this, self](boost::system::error_code ec, tcp::resolver::iterator){
if (!ec)
{
doHandshake();
doRead();
}
else {
// don't know if async_connect can fail but set the socket to open
if (m_socket->is_open()){
m_socket->close();
}
}
});
}
void ConnectionA::doRead()
{
auto self(shared_from_this());
boost::asio::async_read(*m_socket,
boost::asio::buffer(m_readBuf, m_readBufSize),
[this, self](boost::system::error_code ec, std::size_t){
if(!ec){
// check server answer for errors
}
doRead();
}
else {
stop();
}
});
}
void ConnectionA::doPost(std::string &message)
{
A_Msg newMsg (message);
auto self(shared_from_this());
m_ioS->post([this, self, newMsg](){
bool writeInProgress = false;
if (!m_A_MsgQueue.empty()){
writeInProgress = true;
}
boost::posix_time::ptime currentTime = time_traits_t::now();
m_AMsgQueue.push_back(std::make_pair(newMsg,currentTime));
if (!writeInProgress)
{
doWrite();
}
});
}
void ConnectionA::doWrite()
{
while (!m_AMsgQueue.empty())
{
if (m_AMsgQueue.front().second + m_maxMsgAge < time_traits_t::now()){
m_AMsgQueue.pop_front();
continue;
}
if (!m_socket->is_open()){
continue;
}
auto self(shared_from_this());
boost::asio::async_write(*m_socket,
boost::asio::buffer(m_AMsgQueue.front().first.data(),
m_AMsgQueue.front().first.A_lenght),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec) // successful
{
m_handshakeTimer->expires_from_now(m_handshakePeriod); // reset timer
m_AMsgQueue.pop_front();
doWrite();
}
else {
if (m_socket->is_open()){
m_socket->close();
}
}
});
}
}
void ConnectionA::deadlineTimer_handler(const boost::system::error_code& error){
if (m_stopped){
return;
}
m_deadlineTimer->async_wait(boost::bind(&ConnectionA::deadlineTimer_handler, this, boost::asio::placeholders::error));
if (!error && !m_socket->is_open()) // timer expired and no connection was established
{
doConnect();
}
else if (!error && m_socket->is_open()){ // timer expired and connection was established
m_deadlineTimer->expires_at(boost::posix_time::pos_infin); // to reactivate timer call doConnect()
}
}
最后还有另一个类封装了这些类,使其使用起来更舒适:
TcpConnect.h
class CTcpConnect
{
public:
/*! Ctor
*/
CTcpConnect();
//! Dtor
~CTcpConnect();
void initConnectionA(std::string &IP, const int &port);
void initConnectionB(std::string &IP, const int &port);
void initConnectionC(std::string &IP, const int &port);
void postMessageA(std::string &message);
void run();
void stop();
private:
ConnectionA m_AConnection;
ConnectionB m_BConnection;
ConnectionC m_CConnection;
}
TcpConnect.cpp
CTcpConnect::CTcpConnect()
: m_AConnection()
, m_BConnection()
, m_CConnection()
{}
CTcpConnect::~CTcpConnect()
{}
void CTcpConnect::run(){
[this](){ m_AConnection.m_ioS->run(); };
[this](){ m_BConnection.m_ioS->run(); };
[this](){ m_CConnection.m_ioS->run(); };
}
void CTcpConnect::stop(){
m_AConnection.stop();
m_BConnection.stop();
m_CConnection.stop();
}
void CTcpConnect::initConnectionA(std::string &IP, const int &port)
{
m_AConnection = ConnectionA(IP, port);
m_AConnection.setMaxMsgAge(30000);
//... set some view parameter more
m_AConnection.doConnect();
}
// initConnectionB & initConnectionC are quite the same
void CTcpConnect::postMessageA(std::string &message)
{
m_AConnection.doWrite(message);
}
一开始我也尝试只有一个io_service(对于我的方法来说这很好(,但是将服务作为参考会让人头疼,因为我的实现还需要连接的默认构造函数。现在,每个连接都有自己的 io-service。
任何想法如何运行此代码?请随时为其他体系结构提出建议。如果你能想出这个,一些片段会更好。我已经为此实现而苦苦挣扎了数周。我很感激每一个提示。
顺便说一句,我正在使用 VS1.61 的提升 12。
这是问题所在:
m_AConnection = ConnectionA(IP, port);
也就是说,ConnectionA
派生自Connection
,而派生自enable_shared_from_this
。这意味着ConnectionA
必须实例化为共享指针,shared_from_this
才能正常工作。
试试这个:
void CTcpConnect::initConnectionA(std::string &IP, const int &port)
{
m_AConnection = std::make_shared<ConnectionA>(IP, port);
m_AConnection->setMaxMsgAge(30000);
//... set some view parameter more
m_AConnection->doConnect();
}
编辑1:
你是对的。这就是问题所在。现在我意识到我调用io-service.run((的方式完全是废话。
使用多个io_service
是非常罕见的,并且每个连接使用一个:)
但是,您知道我是否需要演员表然后调用shared_from_this((吗?我注意到 asynch_connect(( 在有和没有演员阵容的情况下都能正常工作。
许多 Asio 示例为了方便起见而使用 shared_from_this()
,例如,我根本不在我的项目中使用它。使用 Asio 时,您需要小心某些规则。例如,一个是读取和写入缓冲区在执行相应的回调之前不得被破坏,如果 lambda 函数捕获指向保存缓冲区的对象的共享指针,则此条件基本成立。
例如,您也可以执行以下操作:
auto data = std::make_shared<std::vector<uint8_t>>(10);
async_read(socket,
boost::asio::const_buffer(*data),
[data](boost::system::error_code, size_t) {});
这将是有效的,但具有性能缺点,即每次读取时都会在std::vector
内部分配一个新数据。
当您查看一些 lambda 时,可以看到shared_from_this()
有用的另一个原因,它们通常具有以下形式:
[this, self,...](...) {...}
也就是说,您经常希望在它们内部使用this
。如果您没有捕获self
,则需要使用其他措施来确保在调用处理程序时this
未被销毁。
- 获取日期异步信号安全吗?如果在信号处理程序中使用,它会导致死锁吗
- 当套接字连接断开时检测C/C++Unix
- 无法在windows上使用mingw将sqlite3与c连接
- 到连接组件算法的问题(递归)
- QTcpSocket在不阻塞GUI的情况下重新连接到服务器
- 无法在C++中建立与MySQL数据库的连接
- PC中的程序和PHONE中的本机描述应用程序之间的数据连接
- 在Qt Creator中,如何在连接到正在运行的进程后查看控制台输出
- 连接 dockerized 模型和 dockerized 数据库时出现"无法 SQLConnect"错误
- 某些 boost::asio 异步函数是否将处理程序连接到操作,以便处理程序被触发一次?
- 多个网络连接的线程与异步 I/O
- Boost.Asio 异步服务器.限制为一个连接
- 异步模式下使用的WinHttp-ERROR_INTERNET_CANNOT_CONNECT如何干净地关闭连接
- 提升异步 TCP 客户端在没有物理连接的情况下写入时不返回错误
- 异步套接字连接,当服务器断开TCP通信时,客户端GUI挂起
- 在没有连接的情况下调用C++Boost异步服务器处理程序
- 异步服务器不接受连接
- 使用gSoap的异步、确认、点对点连接
- Boost异步服务器接受两次连接
- boost::asio 和多个客户端连接使用异步