如何用C/C++和Boost Asio优化客户端/服务器

How to Optimize Client/Server with C/C++ and Boost Asio

本文关键字:优化 Asio 客户端 服务器 Boost 何用 C++      更新时间:2023-10-16

我有两个类似TCP客户端/服务器的应用程序。

第一个应用程序是客户端,它使用OpenCV检测命令,并通过TCP向控制移动机器人的服务器发送命令。

如果我在开发计算机,我的应用程序运行良好,但当我用机器人在现实世界中测试它时,我意识到我在客户端和服务器之间交换的数据有一些延迟。发生这种情况是因为iItest应用程序的计算机与我正在开发的计算机相比有点慢,因为我的计算机速度更快,没有问题。在现实世界中,服务器并没有实时接收到来自客户端的数据包,所以它会延迟执行操作。

因此,问题是当客户端失去检测并向服务器发送命令以停止它时。服务器接收到有延迟的数据包,因此当客户端发送停止(heading=0,distance=0,nodedetection)时,服务器不会立即接收到命令,因为它正在接收以前的命令数据包,所以它只在几米后停止。

我想找到一个解决方案,以便立即停止服务器并丢弃所有关于移动信息的包,因为如果机器人必须停止,这些包是无用的。为了阻止机器人,我发送了一个节点检测包,但不幸的是,该包没有实时收到,所以机器人继续移动了一段时间。(我在同一台机器上做这个测试,所以我在localhost上连接)

目前,客户端使用以下代码:

while (key_mode!='q')
    {
        //wait and error processing
        context.WaitAnyUpdateAll();
        // obtain al the metadata image,depthmap and scene
        Mat frame = getImageFromKinect();
        // do detection and tracking
        switch(mode)
        {
..
                               case ROBOT_CONTROL:
                 {
                 // Connect to the server
                 using boost::asio::ip::tcp;
                             boost::asio::io_service io_service;
                                 tcp::resolver resolver(io_service);
                     tcp::resolver::query query(tcp::v4(), server, boost::lexical_cast<string>(porta));
                                 tcp::resolver::iterator iterator = resolver.resolve(query);
                                 tcp::socket s(io_service); 
                                 try
                 {
                                 s.connect(*iterator);
                 }
                                 catch (boost::system::system_error const& e)
                 {
                                 std::cout << "Warning: could not connect to the servern" << e.what() << "nPossible Solution: try to check is Server is UPn" << std::endl;
                 }
..
..
 float delta = heading - last_heading;
                     if (!is_equal(delta, 0.0)){ 
                    // heading_data = send_heading + token + boost::lexical_cast<std::string>(delta);
                    // heading_length = strlen(heading_data.c_str());
                     try
                     {
                        // boost::asio::write(s, boost::asio::buffer(heading_data, heading_length));  
                     }
                     catch (boost::system::system_error const& e)
                     {
                         std::cout << "Warning: could not send commands : " << e.what() << std::endl;
                     }

                     }
                                         last_heading = heading; // store current for next subtraction
                     #endif
                     #if 1
                                         heading_scalato = heading / 3.0;
                     heading_data = send_heading + token + boost::lexical_cast<std::string>(heading_scalato);
                     heading_length = strlen(heading_data.c_str());
                         try
                     {
                         boost::asio::write(s, boost::asio::buffer(heading_data, heading_length));  
                     }
                     catch (boost::system::system_error const& e)
                     {
                         std::cout << "Warning: could not send commands : " << e.what() << std::endl;
                     }
                                         #endif

                                        distance_data = send_distance + token + boost::lexical_cast<std::string>(distance);


                    distance_length = strlen(distance_data.c_str());

                    try
                    {
                    boost::asio::write(s, boost::asio::buffer(distance_data, distance_length));
                    }
                    catch (boost::system::system_error const& e)
                    {
                    std::cout << "Warning: could not connect : " << e.what() << std::endl;
                    }
..
..
// if it has to stop:
else
                {
                    // stop rover
                    //control.setHeading(0.0);
                    //control.setDistance(0.0);
                                        float heading = 0.0;
                                        float distance = 0.0;
                                        heading_data = send_heading + token + boost::lexical_cast<std::string>(heading);
                                        distance_data = send_distance + token + boost::lexical_cast<std::string>(distance);
                                        heading_length = heading_data.size();//strlen(heading_data.c_str());
                    distance_length = strlen(distance_data.c_str());

                        try
                    {
                                        boost::asio::write(s, boost::asio::buffer(heading_data, heading_length));
                                        boost::asio::write(s, boost::asio::buffer(distance_data, distance_length));
                    }
                    catch (boost::system::system_error const& e)
                    {
                        std::cout << "Warning: could not send commands : " << e.what() << std::endl;
                    }


                    // write info on image
                    char text[100];
                    sprintf(text,"ROBOT CONTROL: No detection");
                    putText(hogResultFrame,text,Point(4,89),FONT_HERSHEY_PLAIN,1,Scalar(0,0,0));
                    putText(hogResultFrame,text,Point(5,90),FONT_HERSHEY_PLAIN,1,Scalar(100,100,255));
                                        nodetection_length = nodetection.size();
                    try
                    {
                                        boost::asio::write(s, boost::asio::buffer(nodetection, nodetection_length));
                    }
                    catch (boost::system::system_error const& e)
                    {
                        std::cout << "Warning: could not send commands : " << e.what() << std::endl;
                    }

在服务器中,我使用:

void* runThread(void*)
  {
        while(Aria::getRunning())
        {
            if(start_routine){
                if(temp_heading < 0.0){
                printf("nnStarting Discovering routine, then sleeping 3 seconds.ann");
                robot.setRotVel(5.0);
                ArUtil::sleep(3000);
        temp_heading = -1;
        }           
            else if(temp_heading >= 0.0) {
        printf("nnStarting Clockwise Discovering routine, then sleeping 3 seconds.ann");
                robot.setRotVel(-5.0);
                ArUtil::sleep(3000);
        temp_heading = 1;
                } 
                } 

                if( !flag_heading && !flag_distance)
            {               
                myMutex.lock();
                temp_heading=m_heading;
                temp_distance=m_distance;
                myMutex.unlock();
                if (is_equal(temp_heading, 0.0)){
                    robot.setRotVel(0.0);
                    }
                else robot.setRotVel(-ArMath::radToDeg(temp_heading));
                if(temp_distance <= distanza_minima || is_equal(temp_distance, 0.0))
                    robot.setVel(0.0);
                else
                    robot.setVel(float(temp_distance/20));
                printf("runThread:: heading= %f distance = %f rob_vel = %f rob_rot_vel = %fn",ArMath::radToDeg(temp_heading),temp_distance, robot.getVel(),robot.getRotVel());
                flag_heading = true;
                flag_distance = true;
                start_routine = false;
            }
                ArUtil::sleep(100);
        }
  }
DataLine GetValueFromLine(const std::string& sData) {
  std::string sName, sInteger;
  std::stringstream ss;
  DataLine Result;
  size_t sz = sData.find('@');
  sName = sData.substr(0,sz); // Just in case you need it later
  Result.sName = sName;
  sInteger = sData.substr(sz + 1,sData.length() - sz);
  ss.str(sInteger);
  ss >> Result.nNumber;
  if (ss.fail()) {
    // something went wrong, probably not an integer
  }
  return Result;
}
void session(socket_ptr sock)
{
  try
  {
    for (;;)
    {
      char data[max_length];
      boost::system::error_code error;
      size_t length = sock->read_some(boost::asio::buffer(data), error);
      data[length] = 0;
      if (error == boost::asio::error::eof)
        break; // Connection closed cleanly by peer.
      else if (error)
        throw boost::system::system_error(error); // Some other error.
      output = GetValueFromLine(data);

      std::cout << "*******************n";
      comando = output.sName;
      valore = output.nNumber;
      if (output.sName == "nodetection"){
      start_routine = true;
      std::cout << "nSto ricevendo: " << output.sName;
      }
      else if (output.sName == "heading"){
      start_routine = false;
      control.setHeading(output.nNumber);
      std::cout << "nSto ricevendo: " << output.sName << "e heading: " << output.nNumber;
      }                                
      else if (output.sName == "distance"){
      start_routine = false;
      control.setDistance(output.nNumber);
      std::cout << "nSto ricevendo: " << output.sName << "e distance: " << output.nNumber;
      }                             


     // boost::asio::write(*sock, boost::asio::buffer(data, length));
    }
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception in thread: " << e.what() << "n";
  }
}
void server(boost::asio::io_service& io_service, short port)
{
  tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
  for (;;)
  {
    socket_ptr sock(new tcp::socket(io_service));
    a.accept(*sock);
    boost::thread t(boost::bind(session, sock));
  }
}
int main(int argc, char **argv)
{
         // control server initialitation..
         ....
          boost::asio::io_service io_service;  
          server(io_service, porta);
  return 0;                                    
}

我想在客户端达到无检测条件时强制关闭TCP连接,以强制服务器拒绝挂起的数据包,但我该怎么做呢?如何在boost中销毁s指针?

还有其他解决方案吗?如果我关闭连接,服务器会拒绝挂起的数据包吗?

根据我对您的问题的理解,您收到一条消息,您希望您的应用程序放弃所有当前处理并刷新输入队列。但是,由于应用程序忙于接收和处理以前的消息,所以在处理完所有以前的消息之前,它不会接收放弃和刷新消息,这使得放弃和刷新成为一个无操作

IMHO您需要设计和编写一个多线程应用程序。

一个线程,重量尽可能轻,以尽可能快的速度读取传入消息,并快速检查放弃和刷新消息。如果消息是OK,则会将其添加到队列中,并检查下一条消息。

第二个线程从第一个线程存储消息的队列中提取消息并进行处理,这可能需要很长时间。从那时起,它会检查来自第一个线程的放弃和刷新信号。

另一种需要考虑的方法是:发送消息的应用程序维护队列。当接收消息的应用程序完成对消息的处理时,它会发送下一条消息的请求。发件人仅在收到请求时发送邮件。如果出现放弃和刷新条件,则发送消息的应用程序会对此进行处理。接收消息的应用程序一次只需要处理一个消息。这种方法极大地简化了消息接收器,代价是发送应用程序的复杂性、更复杂的通信协议,并且可能降低了最大吞吐量。