是否可以使用 Varint32 大小前缀的协议缓冲区消息实现类似"FileInputStream::BackUp()"的功能?

Is it possible to implement 'FileInputStream::BackUp()'-like functionality with Varint32 size-prefixed Protocol Buffer messages?

本文关键字:FileInputStream BackUp 功能 消息 Varint32 可以使 前缀 是否 缓冲区 协议 实现      更新时间:2023-10-16

我正在尝试使用以下readDelimitedFrom()的实现在c++中解析分隔的protobuf消息(从文件)-也复制如下:

bool readDelimitedFrom(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    google::protobuf::MessageLite* message) {
  // We create a new coded stream for each message.  Don't worry, this is fast,
  // and it makes sure the 64MB total size limit is imposed per-message rather
  // than on the whole stream.  (See the CodedInputStream interface for more
  // info on this limit.)
  google::protobuf::io::CodedInputStream input(rawInput);
  // Read the size.
  uint32_t size;
  if (!input.ReadVarint32(&size)) return false;
  // Tell the stream not to read beyond that size.
  google::protobuf::io::CodedInputStream::Limit limit =
      input.PushLimit(size);
  // Parse the message.
  if (!message->MergeFromCodedStream(&input)) return false;
  if (!input.ConsumedEntireMessage()) return false;
  // Release the limit.
  input.PopLimit(limit);
  return true;
}

我的问题是,我需要对消息进行分组,并根据消息中包含的uint32_t字段批量处理它们-让我们称之为id

目前,我的主循环中有以下代码:

...
int infd = -1;
_sopen_s(&infd, argv[1], _O_RDONLY | _O_BINARY, _SH_DENYWR, _S_IREAD);
google::protobuf::io::ZeroCopyInputStream *input = 
    new google::protobuf::io::FileInputStream(infd);
std::vector<ProtoMessage> msgList;
bool readMore = true;
do {
    ProtoMessage msg;
    readMore = readNextMessage(input, msg, msgList);
    if (!msgList.empty()) {
        std::cout << "Processing Message Batch - ID: " << msgList[0].id();
        /* some processing done here */
    }
} while (readMore);

readNextMessage()的实现如下:

bool readNextMessage(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    ProtoMessage& nextMsg,
    std::vector<ProtoMessage>& batchList) {
    bool sameBatch = false;
    uint32_t msgID = 0;
    do {
        if (readDelimitedFrom(rawInput, &scan) == -1)
            return false;
        if (nextMsg.id() == 0)
            msgID = nextMsg.id();    // guaranteed to be non-zero
        if (sameBatch = (msgID == nextMsg.id()))
            batchList.push_back(nextMsg); 
    } while (sameBatch); 
    // need a way to roll-back here as nextMsg is now the first new
    // ProtoMessage belonging to a new batch.
    return true;
}

此函数的逻辑相当简单:取ZeroCopyInputStream并使用readDelimitedFrom()对其进行解析,以根据id字段将ProtoMessage消息分组为向量。如果它遇到具有新id的消息,则停止并将控制权返回给main,以便对消息批处理进行处理。

这会导致不希望的需求,即必须消费/读取不属于前一批的消息(包括其varint32编码的大小),而没有办法"备份"流。我希望能够将ZeroCopyInputStream指向最后一个readDelimitedFrom()之前的位置。

我是否有办法修改readDelimitedFrom(),以返回其调用期间消耗的字节数,然后在ZeroCopyInputStream上使用指针算术来实现所需的功能?

提供的函数ZeroCopyInputStream::Backup()有一个先决条件,即ZeroCopyInputStream::Next()是最后一个方法调用。显然,当使用CodedInputStream包装器解析带分隔符的消息时,情况并非如此。

ZeroCopyInputStream::Backup()只能备份最近收到的缓冲区。单个消息可能跨越多个缓冲区,因此在给定ZeroCopyInputStream接口的情况下,没有通用的方法来做您想做的事情。

一些选择:

  • 在解析每条消息之前调用rawInput->ByteCount(),以便准确地确定消息开始的字节位置。如果需要回滚,请向后查找底层文件并在其上重新创建ZeroCopyInputStream。当然,这只适用于从文件中读取。
  • 当您在新批处理中遇到消息时,将其存储在一边,然后在调用者要求开始读取下一批处理时将其带回来。