boost::asio 和多个客户端连接使用异步

boost::asio and multiple client connections using asynch

本文关键字:连接 异步 客户端 asio boost      更新时间:2023-10-16

我需要与不同的服务器建立最多三个不同的TCP连接。所有三个连接都需要不同的协议、不同的握手和不同的心跳。学习 http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/example/cpp11/chat/chat_client.cpp,在这里阅读东西并遵循Chris Kohlhoff的建议,我试图实现如下。

问题是,使用这种架构,无论我在做什么,我在doConnect((中调用shared_from_this((时都会遇到bad_weak_pointer异常。

导入 这些只是未运行代码的片段,可能包含错误!进口

我有一个包含一些基本方法的基类。

连接.h

class Connection : public std::enable_shared_from_this<Connection>
{
public:
  //! Ctor
  inline Connection();
  //! Dtor
  inline virtual ~Connection();
  inline void setReconnectTime(const long &reconnectAfterMilisec)
  {
    m_reconnectTime = boost::posix_time::milliseconds(reconnectAfterMilisec);
  }
  inline void setHandshakePeriod(const long &periodInMilisec)
  {
    m_handshakePeriod = boost::posix_time::milliseconds(periodInMilisec);
  }
  virtual void doConnect() = 0;
  virtual void stop() = 0;
  //... and some view more...
}

然后,我有三个类,它们派生自基类。这里只有一个(也是核心部分(来描述这种方法。

连接A.h

//queues which containing also the age of the messages
typedef std::deque<std::pair<handshakeMsg, boost::posix_time::ptime>> handskMsg_queue;
typedef std::deque<std::pair<errorcodeMsg, boost::posix_time::ptime>> ecMsg_queue;
typedef std::deque<std::pair<A_Msg, boost::posix_time::ptime>> A_Msg_queue;
class ConnectionA : public Connection
{
public:
  ConnectionA();
  ConnectionA(const std::string& IP, const int &port);
  ConnectionA& operator=(const ConnectionA &other);
  virtual ~ConnectionA();
  virtual void stop() override;
  virtual void doConnect() override;
  void doPost(std::string &message);
  void doHandshake();
  void sendErrorCode(const int &ec);
  std::shared_ptr<boost::asio::io_service>m_ioS;
private:
  std::shared_ptr<tcp::socket> m_socket;
  std::shared_ptr<boost::asio::deadline_timer> m_deadlineTimer; // for reconnetions
  std::shared_ptr<boost::asio::deadline_timer> m_handshakeTimer; // for heartbeats
  void deadlineTimer_handler(const boost::system::error_code& error);
  void handshakeTimer_handler(const boost::system::error_code& error);
  void doRead();
  void doWrite();
  std::string m_IP;
  int m_port;
  handskMsg_queue m_handskMsgQueue;
  ecMsg_queue m_ecMsgQueue;
  A_Msg_queue m_AMsgQueue;
}

连接A.cpp

ConnectionA::ConnectionA(const std::string &IP, const int &port)
: m_ioS()
, m_socket()
, m_deadlineTimer()
, m_handshakeTimer()
, m_IP(IP)
, m_port(port)
, m_handskMsgQueue(10)
, m_ecMsgQueue(10)
, m_AMsgQueue(10)   
{
  m_ioS = std::make_shared<boost::asio::io_service>();
  m_socket = std::make_shared<tcp::socket>(*m_ioS);
  m_deadlineTimer = std::make_shared<boost::asio::deadline_timer>(*m_ioS);
  m_handshakeTimer = std::make_shared<boost::asio::deadline_timer> (*m_ioS);
  m_deadlineTimer->async_wait(boost::bind(&ConnectionA::deadlineTimer_handler, this, boost::asio::placeholders::error));
  m_handshakeTimer->async_wait(boost::bind(&ConnectionA::handshakeTimer_handler, this, boost::asio::placeholders::error));
}
ConnectionA::~ConnectionA()
{}
void ConnectionA::stop()
{
  m_ioS->post([this]() { m_socket->close(); });
  m_deadlineTimer->cancel();
  m_handshakeTimer->cancel();
}
void ConnectionA::doConnect()
{
  if (m_socket->is_open()){
    return;
  }
  tcp::resolver resolver(*m_ioS);
  std::string portAsString = std::to_string(m_port);
  auto endpoint_iter = resolver.resolve({ m_IP.c_str(), portAsString.c_str() });
  m_deadlineTimer->expires_from_now(m_reconnectTime);
  // this gives me a bad_weak_pointer exception!!!
  auto self = std::static_pointer_cast<ConnectionA>(static_cast<ConnectionA*>(this)->shared_from_this()); 
  boost::asio::async_connect(*m_socket, endpoint_iter, [this, self](boost::system::error_code ec, tcp::resolver::iterator){
    if (!ec)
    {
      doHandshake();
      doRead();
    } 
    else {
      // don't know if async_connect can fail but set the socket to open
      if (m_socket->is_open()){
        m_socket->close();
      }
    }
  });
}
void ConnectionA::doRead()
{
  auto self(shared_from_this());
  boost::asio::async_read(*m_socket, 
    boost::asio::buffer(m_readBuf, m_readBufSize), 
    [this, self](boost::system::error_code ec, std::size_t){
    if(!ec){
        // check server answer for errors
        }
        doRead();
    }
    else {
        stop();
    }
  });
}
void ConnectionA::doPost(std::string &message)
{
    A_Msg newMsg (message);
    auto self(shared_from_this());
    m_ioS->post([this, self, newMsg](){
    bool writeInProgress = false;
    if (!m_A_MsgQueue.empty()){
        writeInProgress = true;
    }
    boost::posix_time::ptime currentTime = time_traits_t::now();
    m_AMsgQueue.push_back(std::make_pair(newMsg,currentTime));
    if (!writeInProgress)
    { 
        doWrite();
    }       
  });   
}
void ConnectionA::doWrite()
{
  while (!m_AMsgQueue.empty())
  {
    if (m_AMsgQueue.front().second + m_maxMsgAge < time_traits_t::now()){
            m_AMsgQueue.pop_front();
            continue;
    }
    if (!m_socket->is_open()){
        continue;
    }
    auto self(shared_from_this());
    boost::asio::async_write(*m_socket,
      boost::asio::buffer(m_AMsgQueue.front().first.data(),
      m_AMsgQueue.front().first.A_lenght),
      [this, self](boost::system::error_code ec, std::size_t /*length*/)
    {
        if (!ec) // successful
      {
        m_handshakeTimer->expires_from_now(m_handshakePeriod); // reset timer
        m_AMsgQueue.pop_front();
        doWrite();
      }
      else {
        if (m_socket->is_open()){
          m_socket->close();
        }
      }
    });
  }
} 
void ConnectionA::deadlineTimer_handler(const boost::system::error_code& error){
  if (m_stopped){
    return;
  }
  m_deadlineTimer->async_wait(boost::bind(&ConnectionA::deadlineTimer_handler, this, boost::asio::placeholders::error));
  if (!error && !m_socket->is_open()) // timer expired and no connection was established
  {     
    doConnect();
  }
  else if (!error && m_socket->is_open()){ // timer expired and connection was established
    m_deadlineTimer->expires_at(boost::posix_time::pos_infin); // to reactivate timer call doConnect()
  }
}

最后还有另一个类封装了这些类,使其使用起来更舒适:

TcpConnect.h

class CTcpConnect
{
public:
  /*! Ctor
  */
  CTcpConnect();
  //! Dtor
  ~CTcpConnect();
  void initConnectionA(std::string &IP, const int &port);
  void initConnectionB(std::string &IP, const int &port);
  void initConnectionC(std::string &IP, const int &port);
  void postMessageA(std::string &message);
  void run();
  void stop();
private:
  ConnectionA m_AConnection;
  ConnectionB m_BConnection;
  ConnectionC m_CConnection;
}

TcpConnect.cpp

CTcpConnect::CTcpConnect()
: m_AConnection()
, m_BConnection()
, m_CConnection()
{}
CTcpConnect::~CTcpConnect()
{}
void CTcpConnect::run(){
  [this](){ m_AConnection.m_ioS->run(); };
  [this](){ m_BConnection.m_ioS->run(); };
  [this](){ m_CConnection.m_ioS->run(); };
}
void CTcpConnect::stop(){
  m_AConnection.stop();
  m_BConnection.stop();
  m_CConnection.stop();
}
void CTcpConnect::initConnectionA(std::string &IP, const int &port)
{
  m_AConnection = ConnectionA(IP, port);
  m_AConnection.setMaxMsgAge(30000);
  //... set some view parameter more
  m_AConnection.doConnect();
}
// initConnectionB & initConnectionC are quite the same
void CTcpConnect::postMessageA(std::string &message)
{
  m_AConnection.doWrite(message);
}

一开始我也尝试只有一个io_service(对于我的方法来说这很好(,但是将服务作为参考会让人头疼,因为我的实现还需要连接的默认构造函数。现在,每个连接都有自己的 io-service。

任何想法如何运行此代码?请随时为其他体系结构提出建议。如果你能想出这个,一些片段会更好。我已经为此实现而苦苦挣扎了数周。我很感激每一个提示。

顺便说一句,我正在使用 VS1.61 的提升 12。

这是问题所在:

m_AConnection = ConnectionA(IP, port);

也就是说,ConnectionA派生自Connection,而派生自enable_shared_from_this。这意味着ConnectionA必须实例化为共享指针,shared_from_this才能正常工作。

试试这个:

void CTcpConnect::initConnectionA(std::string &IP, const int &port)
{
  m_AConnection = std::make_shared<ConnectionA>(IP, port);
  m_AConnection->setMaxMsgAge(30000);
  //... set some view parameter more
  m_AConnection->doConnect();
}

编辑1:

你是对的。这就是问题所在。现在我意识到我调用io-service.run((的方式完全是废话。

使用多个io_service是非常罕见的,并且每个连接使用一个:)

但是,您知道我是否需要演员表然后调用shared_from_this((吗?我注意到 asynch_connect(( 在有和没有演员阵容的情况下都能正常工作。

许多 Asio 示例为了方便起见而使用 shared_from_this(),例如,我根本不在我的项目中使用它。使用 Asio 时,您需要小心某些规则。例如,一个是读取和写入缓冲区在执行相应的回调之前不得被破坏,如果 lambda 函数捕获指向保存缓冲区的对象的共享指针,则此条件基本成立。

例如,您也可以执行以下操作:

auto data = std::make_shared<std::vector<uint8_t>>(10);
async_read(socket,
           boost::asio::const_buffer(*data),
           [data](boost::system::error_code, size_t) {});

这将是有效的,但具有性能缺点,即每次读取时都会在std::vector内部分配一个新数据。

当您查看一些 lambda 时,可以看到shared_from_this()有用的另一个原因,它们通常具有以下形式:

[this, self,...](...) {...}

也就是说,您经常希望在它们内部使用this。如果您没有捕获self,则需要使用其他措施来确保在调用处理程序时this未被销毁。