由于管道破裂,提升 asio 发送消息失败

Boost asio send message fail due to broken pipe

本文关键字:asio 提升 失败 消息 于管道 管道      更新时间:2023-10-16

我编写了以下类来通过SSL连接将数据发送到服务器。数据打包在 protobuf 消息中。当仅发送和接收一条消息(例如获取和发送心跳)时,它可以完美运行。但是当涉及发送多个消息(如心跳和另一个数据请求消息)时,由于管道损坏而失败。

示例代码:

SSLHandler conn;//working
while(true){
std::vector<char> msg = buildHearbeatMsg()
conn.writeMessage(msg);
//conn.handleReadMessage(...) get called 
}
SSLHandler conn;//broken pipe
while(true){
std::vector<char> msg = buildHearbeatMsg()
conn.writeMessage(msg);
std::vector<char> msg2 = buildRequestDataMsg()
conn.write(msg);//broken pipe

}

该类是

#ifndef SSLHANDLER_H
#define SSLHANDLER_H
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <istream>
#include <ostream>
#include <string>
#include <boost/lockfree/spsc_queue.hpp>
#include "msg.pb.h"
const int READ_SIZE =0;
const int READ_MSG=1;
class SSLHandler
{
public:


SSLHandler(boost::asio::io_service& io_service, boost::asio::ssl::context& context, boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
: socket_(io_service, context) , mEndpointIterator (endpoint_iterator) 
{
socket_.set_verify_mode(boost::asio::ssl::context::verify_none);
socket_.set_verify_callback(boost::bind(&SSLHandler::verify_certificate, this, _1, _2));

mode = READ_SIZE;
}
bool verify_certificate(bool preverified, boost::asio::ssl::verify_context& ctx);
void handle_connect(const boost::system::error_code& error);
void handle_handshake(const boost::system::error_code& error);
void handle_write(const boost::system::error_code& error, size_t bytes_transferred);
void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
void handle_read_message(const boost::system::error_code& error, size_t bytes_transferred);
void connectToServer();
void writeMessage(std::vector<char> &array);


void setRequestMsg(std::vector<char> &&array);

private:
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> socket_;
boost::asio::ip::tcp::resolver::iterator mEndpointIterator;
char reply_[0x1 << 16]; //=65356 bytes
int mode;
uint32_t size;
std::vector<char> requestMsg;
std::vector<char> replyMsg;

};
#endif // SSLHANDLER_H

#include "sslhandler.h"
bool SSLHandler::verify_certificate(bool preverified, boost::asio::ssl::verify_context &ctx)
{
char subject_name[256];
X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
std::cout << "Verifying:n" << subject_name << std::endl;
return preverified;
}
void SSLHandler::handle_connect(const boost::system::error_code &error)
{
if(!error){
std::cout << "Connection OK!" << std::endl;
socket_.async_handshake(boost::asio::ssl::stream_base::client, boost::bind(&SSLHandler::handle_handshake, this, boost::asio::placeholders::error));
}else{
std::cout << "Connect failed: " << error.message() << std::endl;
}
}
void SSLHandler::handle_handshake(const boost::system::error_code &error)
{
if(!error){
std::cout << "Sending request: " << std::endl;

boost::asio::async_write(socket_,
boost::asio::buffer(requestMsg.data(), requestMsg.size()),
boost::bind(&SSLHandler::handle_write, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}else{
std::cout << "Handshake failed: " << error.message() << std::endl;
}
}
void SSLHandler::handle_write(const boost::system::error_code &error, size_t bytes_transferred)
{
if (!error){
if(mode==READ_SIZE){
std::cout << "Sending request OK!" << std::endl;
//      char respond[bytes_transferred] = "";
boost::asio::async_read(socket_, boost::asio::buffer(reply_,4),
boost::bind(&SSLHandler::handle_read,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
std::cerr << "respond is " ;
}
else if(mode == READ_MSG){
std::cout << "Sending request OK!" << std::endl;
//      char respond[bytes_transferred] = "";
boost::asio::async_read(socket_, boost::asio::buffer(reply_,size),
boost::bind(&SSLHandler::handle_read_message,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
std::cerr << "respond is " ;
}

}else{
std::cout << "Write failed: " << error.message() << std::endl;
}
}
void SSLHandler::handle_read(const boost::system::error_code &error, size_t bytes_transferred)
{
if (!error){

std::cout << "Reply: ";
std::cout.write(reply_, bytes_transferred);
std::cout << "n";
char sizeLittleEndian[4];
sizeLittleEndian[3] = reply_[0];
sizeLittleEndian[2] = reply_[1];
sizeLittleEndian[1] = reply_[2];
sizeLittleEndian[0] = reply_[3];
memcpy(&size, sizeLittleEndian,sizeof(uint32_t));
std::cerr << "size of msg is "  << size;
boost::asio::async_read(socket_, boost::asio::buffer(reply_,size),
boost::bind(&SSLHandler::handle_read_message,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
mode = READ_MSG;


}else{
std::cout << "Read failed: " << error.message() << std::endl;
}
}
void SSLHandler::handle_read_message(const boost::system::error_code &error, size_t bytes_transferred)
{
if (!error){

std::cout << "Reply: ";
std::cout.write(reply_, bytes_transferred);
std::cout << "n";
replyMsg.assign(reply_,reply_+ size);
mpConnector->setReadMsg(replyMsg);// upper get 
mode = READ_SIZE;

}else{
std::cout << "Read failed: " << error.message() << std::endl;
}
}
void SSLHandler::connectToServer()
{
boost::asio::async_connect(socket_.lowest_layer(), mEndpointIterator, boost::bind(&SSLHandler::handle_connect, this, boost::asio::placeholders::error));
}
void SSLHandler::writeMessage(std::vector<char> &array)
{
boost::asio::async_write(socket_,
boost::asio::buffer(array.data(), array.size()),
boost::bind(&SSLHandler::handle_write, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void SSLHandler::setRequestMsg(std::vector<char> &&array)
{
requestMsg = std::move(array);
}

查看有关async_write的参考资料

函数调用始终立即返回

所以如果你有代码

void foo (vector<> data) {
socket.async_write(); // this function returns immediately
}

并且您在同一套接字上调用多个foo函数可能会遇到麻烦,因为

程序必须确保流在此操作完成之前不执行其他写入操作(如async_write、流的async_write_some函数或执行写入的任何其他组合操作)。

在您的代码中,您在同一连接对象上调用多个writeMessageSSLHandler方法,这是不正确的方法。

第二个问题。您必须确保要使用async_write通过套接字发送的数据仍然存在,直到async_write结束。 此代码使用async_write

不正确
while(true){
std::vector<char> msg = buildHearbeatMsg()
conn.writeMessage(msg);
}

您在 while 作用域中创建了msg作为局部变量,然后调用writeMessage但此函数会立即返回,并且可以在async_write结束工作之前删除msg

也许您应该使用同步操作而不是异步来执行任务。