解析来自固定大小字节缓冲区的连续块的protobuf消息序列

Parse sequences of protobuf messages from continguous chunks of fixed sized byte buffer

本文关键字:连续 protobuf 消息 缓冲区 字节      更新时间:2023-10-16

我已经连续两天在这方面挣扎了,因为我对C++的了解很差。我需要做的是使用protobuf C++API从一个大文件中解析消息序列,这个文件可能包含数百万条这样的消息。直接从文件中读取很容易,因为我总是可以通过"ReadVarInt32"来获得大小,然后在CodedInputStream上施加限制的情况下进行ParseFromCodedStream,如本文所述。然而,我使用的I/O级API(实际上是libuv)需要为每个读取回调操作分配一个固定大小的缓冲区。显然,块大小与我正在阅读的消息大小无关。

这让我的生活很艰难。基本上,每次我从文件中读取并填充固定大小的缓冲区(比如16K)时,该缓冲区可能会包含数百条完整的protobuf消息,但该缓冲区的最后一块可能是不完整的消息。所以我想,好吧,我应该做的是尝试读取尽可能多的消息,最后,提取最后一个块,并将其附加到我读取的下一个16K缓冲区的开头,继续进行,直到我达到文件的EOF。我使用ReadVarInt32()来获取大小,然后将该数字与缓冲区的其余大小进行比较,如果消息大小较小,则继续读取。

有一个名为GetDirectBufferPointer的API,因此我甚至在读取下一条消息的大小之前,尝试使用它来记录指针位置。然而,我怀疑,由于endianness的怪异,如果我只是从指针开始的地方提取字节数组的其余部分并连接到下一个块,Parse就不会成功,事实上,前几个字节(我认为是8个)完全搞砸了。

或者,如果我对stream.ReadRaw()进行编码,并将剩余流写入缓冲区,然后附加到新块的头部,则数据不会损坏。但问题是,这一次我将丢失"大小"字节信息,因为它已经在"ReadVarInt32"中"读取"了!即使我只是继续并记住上次读取的大小信息,并直接调用下一次迭代消息。ParseFromCodedStream(),它最终少读取了一个字节,有些部分甚至被损坏,无法成功恢复对象。

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    mCheckBuffer.clear();
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. Excuse my terrible C++ foo
    std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
    mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));
    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //Record the pointer location on CIS in bResidueBuffer
    cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
    &bResidueBufSize);
    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) {
         cis.ReadVarint32(&size);
    }
    //Have to read this again to get remaining buffer size
    cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);
    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer, or, it's the end of message 
    //and my buffer just allocated larger so size should be 0
    while (size <= mResidueBufSize && size != 0) {
        //If this cis I constructed didn't have the size info at the beginning, 
        //and I just read straight from it hoping to get the message out from 
        //the "size" I got from last iteration, it simply doesn't work
        //(read one less byte in fact, and some part of the message corrupted)
        //push the size constraint to the input stream;
        int limit = cis.PushLimit(size);
        //parse message from the input stream
        message.ParseFromCodedStream(&cis);  
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        printf("%s", str.c_str());
        //do something with the parsed object
        //Now I have to record the new pointer location again
        cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
        &bResidueBufSize);
        //Read another time the next message's size and go back to while loop check
        cis.ReadVarint32(&size);
    }
    //If I do the next line, bResidueBuffer will have the correct CIS information 
    //copied over, but not having the "already read" size info
    cis.ReadRaw(bResidueBuffer, bResidueBufSize);
    mResidueBuffer.clear();
    //I am constructing a new vector that receives the residual chunk of the 
    //current buffer that isn't enough to restore a message
    //If I don't do ReadRaw, this copy completely messes up at least the first 8 
    //bytes of the copied buffer's value, due to I suspect endianness
    mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0], 
    &bResidueBuffer[bResidueBufSize]);
}

我现在真的不知道。是否可以将protobuf与需要固定大小的中间缓冲区的API一起优雅地使用?非常感谢任何意见,谢谢!

我发现您的代码有两个主要问题:

std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

看起来您希望std::merge连接您的缓冲区,但事实上,该函数执行MergeSort意义上的两个排序数组合并为一个排序数组。在这种情况下,这没有任何意义;mCheckBuffer最终将包含无意义内容。

cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);

在这里,您将&bResidueBuffer强制转换为不兼容的指针类型。bResidueBuffer是一个char数组,因此&bResidueBuffer是指向char数组的指针,而不是指向指针的指针。诚然,这是令人困惑的,因为数组可以隐式转换为指针(指针指向数组的第一个元素),但这实际上是一种转换——bResidueBuffer本身是而不是指针,它只能转换为一个。

我想你也误解了GetDirectBufferPointer()的作用。看起来您希望它将缓冲区的其余部分复制到bResidueBuffer中,但该方法从不复制任何数据。该方法返回一个指向原始缓冲区的指针。

正确的称呼方式是:

const void* ptr;
int size;
cis.GetDirectBufferPointer(&ptr, &size);

现在ptr将指向原始缓冲区。现在,您可以将其与指向缓冲区开头的指针进行比较,以找出您在流中的位置,例如:

size_t pos = (const char*)ptr - &mCheckBuffer[0];

但是,您不应该这样做,因为CodedInputStream已经有了用于此目的的方法CurrentPosition()。这将返回缓冲区中的当前字节偏移量。所以,用它来代替。

好的,感谢Kenton在我的问题中指出主要问题的帮助,我现在已经修改了代码并测试了它的工作情况。我会在这里发布我的解决方案。然而,话虽如此,我对我在这里需要做的所有复杂性和边缘案例检查感到不高兴。我认为它很容易出错。即便如此,我可能真正要做的是在libuv主线程之外的另一个线程中编写我的直接"从流读取"阻塞调用,这样我就不需要使用libuv API了。但为了完整起见,下面是我的代码:

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
std::vector<char> mReadBuffer(READ_BUFFER_SIZE);
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
    //This part is tricky as you're not guaranteed that what end up in 
    //mReadBuffer is everything you read out from the file. The same 
    //happens with libuv's assigned buffer, after EOF, what's rest in 
    //the buffer could be anything
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. I couldn't find a more 
    //efficient way doing that
    mCheckBuffer.clear();
    mCheckBuffer.reserve(mResidueBuffer.size() + mReadBuffer.size());
    mCheckBuffer.insert(mCheckBuffer.end(), mResidueBuffer.begin(),
    mResidueBuffer.end());
    mCheckBuffer.insert(mCheckBuffer.end(), mReadBuffer.begin(),
    mReadBuffer.end());
    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) {
        cis.ReadVarint32(&size);
    }
    bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer. If, it's the end of message 
    //and size (next byte I read from stream) happens to be 0, that
    //will trip me up, cos when I push size 0 into PushLimit and then try 
    //parsing, it will actually return true even if it reads nothing. 
    //So I can get into an infinite loop, if I don't do the check here
    while (size <= bResidueBufSize && size != 0) {
        //If this cis I constructed didn't have the size info at the 
        //beginning, and I just read straight from it hoping to get the  
        //message out from the "size" I got from last iteration
        //push the size constraint to the input stream
        int limit = cis.PushLimit(size); 
        //parse the message from the input stream
        bool result = message.ParseFromCodedStream(&cis);  
        //Parse fail, it could be because last iteration already took care
        //of the last message and that size I read last time is just junk
        //I choose to only check EOF here when result is not true, (which
        //leads me to having to check for size=0 case above), cos it will
        //be too many checks if I check it everytime I finish reading a 
        //message out
        if(!result) {
            if(in.eof()) {
                log.info("Reached EOF, stop processing!");
                break;
            }
            else {
                log.error("Read error or input mal-formatted! Log error!");
                exit;
            }
        }
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        //Do something with the message
        //This is when the last message read out exactly reach the end of 
        //the buffer and there is no size information available on the 
        //stream any more, in which case size will need to be reset to zero
        //so that the beginning of next iteration will read size info first
        if(!cis.ReadVarint32(&size)) {
            size = 0;
        }
        bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
    }
    if(in.eof()) {
        break;
    }
    //Now I am copying the residual buffer into the intermediate
    //mResidueBuffer, which will be merged with newly read data in next iteration
    mResidueBuffer.clear();
    mResidueBuffer.reserve(bResidueBufSize);
    mResidueBuffer.insert(mResidueBuffer.end(), 
    &mCheckBuffer[cis.CurrentPosition()],&mCheckBuffer[mCheckBuffer.size()]);
}
if(!in.eof()) {
    log.error("Something else other than EOF happened to the file, log error!");
    exit;
}