来自 Boost::asio 套接字async_read_some操作的内存泄漏

Memory leak from boost::asio socket async_read_some operation

本文关键字:操作 内存 泄漏 some read asio 套接字 async 来自 Boost      更新时间:2023-10-16

以下一段代码有一个非常麻烦的内存泄漏,即使使用Valgrind,我也无法查明。

void connect_handler(const boost::system::error_code& error)
{
  if (!error)
    std::cout << "Connected to server successfully." << std::endl;
}
void read_handler(const boost::system::error_code& error, 
                  std::size_t bytes_transferred)
{
  if (!error) {
    std::cout << "Transferred " << bytes_transferred 
              << "bytes." << std::endl;
  }
}
int main(int argc, char* argv[])
{
  try
  {
    if (argc != 3)
    {
      std::cerr << "Usage: client <host> <port>" << std::endl;
      return 1;
    }
    boost::asio::io_service io_service;
    boost::asio::ip::tcp::resolver resolver(io_service);
    boost::asio::ip::tcp::resolver::query query(argv[1], argv[2],
      boost::asio::ip::resolver_query_base::numeric_service);
    boost::asio::ip::tcp::resolver::iterator endpoint_iterator = 
      resolver.resolve(query);
    boost::asio::ip::tcp::socket socket(io_service);
    boost::asio::async_connect(socket, endpoint_iterator,
      boost::bind(&connect_handler, boost::asio::placeholders::error));
    std::string ctxt_message = "";
    std::stringstream SS2;
    // std::vector<char> message_vector;
    for (;;)
    {
      boost::array<char, 1024> buf;
      boost::system::error_code error;
      size_t len = 0;
      /* WHAT I BELIEVE TO BE THE MEAT OF THE PROBLEM: */
      socket.async_read_some(boost::asio::buffer(buf, 1024),
        boost::bind(&read_handler, boost::asio::placeholders::error,len));
      if (error == boost::asio::error::eof)
        break; // Connection closed cleanly by peer.
      else if (error)
        throw boost::system::system_error(error); // Some other error.
      SS2.write(buf.data(), len);
    }
  }
  catch (std::exception& e)
  {
    std::cerr << e.what() << std::endl;
  }
  return 0;
}

不允许 Valgrind 运行这个程序直到最后,因为它使我的系统崩溃,但在让它运行几秒钟并取消操作后,我得到以下信息:

==2661== HEAP SUMMARY:
==2661==     in use at exit: 1,010,029,476 bytes in 9,619,333 blocks
==2661==   total heap usage: 9,619,375 allocs, 42 frees, 1,010,034,865 bytes allocated
...
==2661== 1,010,028,180 bytes in 9,619,316 blocks are still reachable in loss record 18 of 18
==2661==    at 0x4C2A879: operator new(unsigned long) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==2661==    by 0x402E01: main (thread_info_base.hpp:60)
==2661== 
==2661== LEAK SUMMARY:
==2661==    definitely lost: 0 bytes in 0 blocks
==2661==    indirectly lost: 0 bytes in 0 blocks
==2661==      possibly lost: 63 bytes in 2 blocks
==2661==    still reachable: 1,010,029,413 bytes in 9,619,331 blocks
==2661==         suppressed: 0 bytes in 0 blocks

有什么想法吗?

您似乎还没有完全理解Asio建模的基于参与者的并发性的想法。

代码片段中永远不会io_service实际运行。所以,是的,允许保留待处理的任务。

如果您打算/不/执行发布的任何异步任务(?!?),则需要取消()/reset()io_service以免泄漏待处理任务。

    resolver.cancel();
    socket.cancel();
    io_service.reset();

无论如何,我认为您错过了异步调用是...异步。例如

  boost::system::error_code error;
  size_t len = 0;
  /* WHAT I BELIEVE TO BE THE MEAT OF THE PROBLEM: */
  socket.async_read_some(boost::asio::buffer(buf, 1024),
    boost::bind(&read_handler, boost::asio::placeholders::error,len));
  if (error == boost::asio::error::eof)
    break; // Connection closed cleanly by peer.
  else if (error)
    throw boost::system::system_error(error); // Some other error.

没有任何意义,因为没有什么分配给error.async_read_some不会执行,因为您不会在服务对象上调用.run()(或.poll().{run,poll}_one())。

这是程序的一个稍微固定的版本:(请注意更新的代码示例以响应注释)

#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/array.hpp>
#include <iostream>
struct Program
{
    boost::array<char, 1024> _buf;
    boost::asio::io_service _io_service;
    boost::asio::ip::tcp::socket _socket;
    std::stringstream _received;
    void read_handler(const boost::system::error_code& error, std::size_t bytes_transferred)
    {
        if (!error) {
            std::cout << "Transferred " << bytes_transferred << "bytes." << std::endl;
            _received.write(_buf.data(), bytes_transferred);
            _socket.async_receive(boost::asio::buffer(_buf),
                    boost::bind(&Program::read_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
        } else
        {
            std::cout << "End of transfer reached: " << error.message() << "n";
            std::cout << "------------------------------------------------------------n";
            std::cout << "Data: '" << _received.str() << "'n";
        }
    }
    void connect_handler(const boost::system::error_code& error)
    {
        if (!error)
        {
            std::cout << "Connected to server successfully." << std::endl;
            _socket.async_receive(boost::asio::buffer(_buf),
                    boost::bind(&Program::read_handler, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
            // this is synchronous, but it could be done using async_* as well:
            _socket.send(boost::asio::buffer("GET / HTTP/1.0rnHost: www.google.comrnrn"));
        }
    }
    Program(std::string const& host, std::string const& service)
        : _buf(), _io_service(), _socket(_io_service), _host(host), _service(service) 
    {
    }
    int run()
    {
        boost::asio::ip::tcp::resolver resolver(_io_service);
        boost::asio::ip::tcp::resolver::query query(_host, _service, boost::asio::ip::resolver_query_base::numeric_service);
        boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
        boost::asio::async_connect(_socket, endpoint_iterator, boost::bind(&Program::connect_handler, this, boost::asio::placeholders::error));
        _io_service.run();
        return 0;
    }
    std::string const _host;
    std::string const _service;
};
int main(int argc, char* argv[])
{
    try
    {
        if (argc != 3)
        {
            std::cerr << "Usage: client <host> <port>" << std::endl;
            return 1;
        }
        Program program(argv[1], argv[2]);
        return program.run();
    }
    catch (std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
}

下面是使用 valgrind 的测试运行(请注意更新代码示例之前的输出):

sehe@desktop:/tmp$ valgrind ./test localhost 22
==14627== Memcheck, a memory error detector
==14627== Copyright (C) 2002-2012, and GNU GPL'd, by Julian Seward et al.
==14627== Using Valgrind-3.8.1 and LibVEX; rerun with -h for copyright info
==14627== Command: ./test localhost 22
==14627== 
Connected to server successfully.
Transferred 41bytes.
SSH-2.0-OpenSSH_6.2p2 Ubuntu-6ubuntu0.2
==14627== 
==14627== HEAP SUMMARY:
==14627==     in use at exit: 0 bytes in 0 blocks
==14627==   total heap usage: 61 allocs, 61 frees, 7,319 bytes allocated
==14627== 
==14627== All heap blocks were freed -- no leaks are possible
==14627== 
==14627== For counts of detected and suppressed errors, rerun with: -v
==14627== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 2 from 2)