Protobuf,CodedInputStream正在解析部分消息
Protobuf, CodedInputStream parsing partial messages
我正在尝试实现与java版本兼容的protobuf发送/接收,该版本首先包含varint32前缀。
我几乎已经让它工作了,但由于某种原因,有些消息变得不完整,并使assert()失败。
/receiver.cpp:69: void tcp_connection::handle_read_message(const boost::system::error_code&, size_t): Assertion `line.ParseFromCodedStream(&input)' failed.
semder.pp
boost::asio::streambuf buffer;
std::ostream writer(&buffer);
bool packet_full = false;
uint32_t sent_lines = 0;
{ //new scope for protobuf streams, these flush in dtor
google::protobuf::io::OstreamOutputStream osostream(&writer);
google::protobuf::io::CodedOutputStream output(&osostream);
std::string lines;
while(std::getline(reader, line)) {
lines += line + "n";
++sent_lines;
if(sent_lines > 100) {
packet_full = true;
break;
}
}
if(!lines.empty()) {
msg->set_text(lines);
const uint32_t size = msg->ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if(buffer != 0) {
msg->SerializeWithCachedSizesToArray(buffer);
} else {
msg->SerializeWithCachedSizes(&output);
}
}
if(sent_lines > 0) {
sock.send(buffer.data());
if(!packet_full && !reader.eof()) { //Read ended, and not due to end of file
std::cout << "An error occured" << std::endl;
break;
}
reader.clear(); //clear EOF flag
}
接收器.cpp
这是一个助推回调。
成员变量:
boost::asio::ip::tcp::socket socket_;
boost::asio::streambuf buffer_;
代码
void handle_read_message(const boost::system::error_code& error,
size_t bytes_transferred) {
if(!error) {
buffer_.commit(bytes_transferred);
std::istream reader(&buffer_);
google::protobuf::io::IstreamInputStream isistream(&reader);
google::protobuf::io::CodedInputStream input(&isistream);
uint32_t size = 0;
assert(input.ReadVarint32(&size));
auto limit = input.PushLimit(size);
msgs::Line line;
assert(line.ParseFromCodedStream(&input));
assert(input.ConsumedEntireMessage());
input.PopLimit(limit);
start();
} else {
std::cout <<"error during handle_read_message: " << error << std::endl;
}
}
这主要基于https://stackoverflow.com/a/22899712
编辑:新的接收器版本,reader_现在是一个成员变量:
void handle_read_message(const boost::system::error_code& error,
size_t bytes_transferred) {
std::cout << "handle_read_message(" << bytes_transferred << ")" <<std::endl;
if(!error) {
buffer_.commit(bytes_transferred);
uint32_t size = 0;
google::protobuf::io::IstreamInputStream isistream_(&reader_);
{
google::protobuf::io::CodedInputStream input(&isistream_);
if(!input.ReadVarint32(&size)) {
std::cout << "Failed to read size, waiting for more data" << std::endl;
start();
return;
}
}
std::size_t varint_size = isistream_.ByteCount();
std::cout <<"varintsize: " << varint_size << ", size: " << size << ", have bytes: " << buffer_.size() << std::endl;
if(varint_size + size > buffer_.size()) {
std::cout << "Not enough data received, waiting for more" << std::endl;
start();
return;
}
google::protobuf::io::CodedInputStream input(&isistream_);
auto limit = input.PushLimit(size);
msgs::Line line;
assert(line.ParseFromCodedStream(&input));
std::cout << line.text() << std::endl;
assert(input.ConsumedEntireMessage());
input.PopLimit(limit);
start();
} else {
std::cout <<"error during handle_read_message: " << error << std::endl;
}
}
如果在接收端使用异步I/O,则需要确保在开始解析之前确实收到了整个消息。请记住,TCP连接是一个流。只要有可用的数据,异步回调就会运行——即使数据不完整。你可能只收到一条部分消息,也可能收到一条完整的消息加上下一条消息。这就是为什么首先需要readDelimitedFrom()
:计算出在解析之前需要等待的确切字节数。
因此,当使用异步I/O时,您需要以不同的方式进行编码。你可以使用这样的策略:
- 维护一个缓冲区,其中包含您迄今为止收到的所有字节
- 每次接收到更多字节时,请将它们添加到缓冲区中。然后,开始尝试按如下方式解析它们——您必须始终从头开始,使用全新的
ZeroCopyInputStream
和CodedInputStream
- 然后,尝试使用
ReadVarint32()
读取大小。如果ReadVarint32失败,那么您还没有收到完整的大小,所以请停止并等待更多的字节 - 如果
ReadVarint32()
成功,则销毁CodedInputStream
,然后在底层ZeroCopyInputStream
上调用ByteCount()
,以了解变量消耗了多少字节 - 您现在知道了消息的大小和可变前缀的大小。把这些加在一起。如果缓冲区中的字节数少于此数量,请停止并等待更多字节
- 您现在拥有了邮件的所有字节。继续,将它们从缓冲区中取出并进行解析。请注意,如果缓冲区中的字节数超过了消息的大小,则应将多余的字节留在缓冲区中,因为它们是下一条消息的一部分
(另外:您的sender.cpp代码中似乎缺少右大括号。如果原始文件有同样的错误,可能是您在CodedOutputStream刷新之前发送了数据。但我猜错误不在原始文件中。)
相关文章:
- 什么时候调用组成单元对象的析构函数
- boost::进程间消息队列引发错误
- 如果C++类在类方法中具有动态分配,但没有构造函数/析构函数或任何非静态成员,那么它仍然是POD类型吗
- 内联映射初始化的动态atexit析构函数崩溃
- 在线编译器中的分段C++没有打印消息
- C++错误消息*成员参考.**初学者*
- 什么时候调用析构函数
- 优先顺序:智能指针和类析构函数
- 在createdialog创建的窗口中捕获用于编辑控件的OnMouseMove消息
- C++-明确何时以及如何调用析构函数
- 使用基类指针创建对象时,缺少派生类析构函数
- 在c++中使用向量时,如何调用构造函数和析构函数
- 要与"if constexpr"一起使用的编译时消息(在预处理器之后)
- 重载运算符new[]的行为取决于析构函数
- 我需要知道编译器如何在cpp中使用析构函数
- 如何通过参数抛出错误消息
- 从服务器传输到客户端的消息不会出现
- 为什么在使用转换构造函数赋值后调用C++类的析构函数?
- 单例:为什么不需要删除并且看不到析构函数调试消息
- g++在虚拟析构函数中给出了删除数组的警告消息,这意味着什么