在 Java 中读取 protobuf 消息时的异常

Exceptions when reading protobuf messages in Java

本文关键字:异常 消息 protobuf Java 读取      更新时间:2023-10-16
我现在使用 protobuf

已经有几周了,但是在 Java 中解析 protobuf 消息时,我仍然不断收到异常。

我使用 C++ 来创建我的 protobuf 消息,并将它们与 boost 套接字一起发送到 Java 客户端监听的服务器套接字。用于传输消息的C++代码如下:

boost::asio::streambuf b;
std::ostream os(&b);
ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);
coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);
delete coded_output;
delete raw_output;
boost::system::error_code ignored_error;
boost::asio::async_write(socket, b.data(), boost::bind(
        &MessageService::handle_write, this,
        boost::asio::placeholders::error));

如您所见,我以WriteVarint32消息的长度编写,因此 Java 端应该通过使用 parseDelimitedFrom 知道它应该读取多远:

AgentMessage agentMessage = AgentMessageProtos.AgentMessage    
                                .parseDelimitedFrom(socket.getInputStream());

但这无济于事,我不断收到这些例外:

Protocol message contained an invalid tag (zero).
Message missing required fields: ...
Protocol message tag had invalid wire type.
Protocol message end-group tag did not match expected tag.
While parsing a protocol message, the input ended unexpectedly in the middle of a field.  This could mean either than the input has been truncated or that an embedded message misreported its own length.

重要的是要知道,这些异常不会在每条消息上引发。这只是我收到的最多消息的一小部分 - 我仍然想解决这个问题,因为我不想省略这些消息。

如果有人能帮助我或花费他的想法,我将不胜感激。


另一个有趣的事实是我收到的消息数量。1.000 秒内 2 条消息的总消息通常适用于我的程序。在 20 秒内大约 100.000 等等。我减少了人工发送的消息,当只传输 6-8 条消息时,根本没有错误。那么这可能是 Java 客户端套接字端的缓冲问题吗?

假设有 60.000 条消息,其中 5 条平均已损坏。

[我不是真正的TCP专家,这可能很离谱]

问题是,[Java] TCP 套接字的read(byte[] buffer)将在读取 TCP 帧末尾后返回。如果这恰好是中间消息(我的意思是,protobuf 消息(,解析器将阻塞并抛出InvalidProtocolBufferException

任何 protobuf 解析调用都使用内部CodedInputStream(此处为 src(,如果源是InputStream,则依赖于read() - 因此,受 TCP 套接字问题的影响。

因此,当您通过套接字填充大量数据时,某些消息必然会被分成两帧 - 这就是它们被损坏的地方。

我猜,当你降低消息传输速率(如你所说,每秒 6-8 条消息(时,每一帧都会在下一个数据片段放入流之前发送,所以每条消息总是有自己的 TCP 帧,即没有被拆分并且不会出错。(或者也许只是错误很少见,而且率低意味着您需要更多时间才能看到它们(

至于解决方案,最好的办法是自己处理缓冲区,即从套接字读取byte[](可能使用 readFully() 而不是 read()因为前者会阻塞,直到有足够的数据填充缓冲区 [或遇到 EOF],所以它有点抵抗消息帧中间结束的东西(, 确保它有足够的数据来解析为整个消息,然后将缓冲区提供给解析器。

此外,在这个Google网上论坛主题中有一些关于这个主题的很好的阅读 - 这就是我得到readFully()部分的地方。

我不熟悉Java API,但我想知道Java如何处理表示消息长度的uint32值,因为Java只有有符号的32位整数。快速浏览Java API参考告诉我,无符号的32位值存储在有符号的32位变量中。那么如何处理无符号 32 位值表示消息长度的情况呢?此外,Java 实现中似乎支持变量有符号整数。它们被称为ZigZag32/64。AFAIK,C++版本不知道这样的编码。因此,也许您的问题的原因可能与这些事情有关?