Protobuf导致ParseFromIstream出现分段错误

Protobuf causes segmentation fault on ParseFromIstream

本文关键字:分段 错误 导致 ParseFromIstream Protobuf      更新时间:2023-10-16

我正在努力扩展我的编程知识,并尝试进行一些多进程编程。

我想做以下操作:在同一台主机上,运行多个可执行文件。其中一个可执行文件负责扫描文件系统,其中一个执行文件处理数据等。

但是,有些数据必须从主机上转移。为了限制网络防火墙设置之类的事情,我希望有一个单独的守护进程(多线程)通过IPC接收数据,然后使用尚未确定的套接字实现将数据发送到外部主机。

经过大量搜索和研究,最明显的使用模式是消费者/生产者模式,其中包括多进程生产者(生成消息的守护进程)和多线程消费者(最好通过共享内存接收数据,并将其发送到外部主机)。

我希望我的应用程序能够尽可能多地跨平台运行。为此,我使用boost::interprocess:message_queue。因为这个Boost库只接受二进制序列化对象,所以我使用GoogleProtobuf来处理序列化和反序列化。

我创建了两个可执行文件,目前称为"消费者"answers"生产者"。生产者通过消息队列将消息发送给消费者,消费者反过来对其进行反序列化。下面的代码在传递简单的"int"对象时有效(在我看来,这意味着消息队列通信正在工作),但在使用SerializeToOstream()中的数据时不起作用。

正如你可能已经注意到的,我是IPC和多进程编程的新手,但我相信我已经做好了功课。

这是我的生产商。cpp:

#include <iostream>
#include <chrono>
#include <thread>
#include <fstream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <internal/messages/testmessage.pb.h>
int main(int argc, char** argv) {
// Construct the object to be passed
GOOGLE_PROTOBUF_VERIFY_VERSION;
struct protoremove {
~protoremove(){ google::protobuf::ShutdownProtobufLibrary(); }
} remover;
ib::protobuf::testMessage myMessage;
myMessage.set_id(10);
myMessage.set_version(1);
std::cout << myMessage.DebugString() << std::endl;
// Initialize the Boost message queue
try{
//Open a message queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue"           //name
,100                       //max message number
,1000               //max message size
);
// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);
}
catch(boost::interprocess::interprocess_exception &ex){
std::cout << ex.what() << std::endl;
return 1;
}
return 0;
}

消费者.cpp:

#include <iostream>
#include <fstream>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/thread.hpp>
#include <internal/messages/testmessage.pb.h>
int main(int argc, char** argv) {
// Open the message queue
try {
//Erase previous message queue
boost::interprocess::message_queue::remove("message_queue");
ib::protobuf::testMessage recvdMessage;
//Create a message_queue.
boost::interprocess::message_queue mq
(boost::interprocess::open_or_create
,"message_queue"           //name
,100                       //max message number
,1000               //max message size
);
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
std::ifstream incomingbuf;
mq.receive(&incomingbuf, 1000, recvd_size, priority);
recvdMessage.ParseFromIstream(&incomingbuf);
recvdMessage.id();
recvdMessage.DebugString();
}
catch(boost::interprocess::interprocess_exception &ex){
boost::interprocess::message_queue::remove("message_queue");
std::cout << "IP error " << ex.what() << std::endl;
return 1;
}
boost::interprocess::message_queue::remove("message_queue");
return 0;
}

消息定义(.proto):

package ib.protobuf;
message testMessage {
required int32 version = 1;
optional int64 id = 2;
optional string data = 3;
optional int64 sequencenumber = 4;
}

当运行consumer时,它会等待数据(mq.rereceive()调用被阻塞)。当生产者启动时,消费者得到一个SIGSEGV。gdb在其回溯中指出,这发生在第44行,即ParseFromIstream()方法。Producer在DebugString()中输出正确的值。

(gdb) r
Starting program: /home/roel/bin/consumer 
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/usr/lib/libthread_db.so.1".
Program received signal SIGSEGV, Segmentation fault.
std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
50  /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc: No such file or directory.
(gdb) bt
#0  std::istream::sentry::sentry (this=0x7fffffffe117, __in=..., __noskip=true)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:50
#1  0x00007ffff679f7ab in std::istream::read (this=0x7fffffffe380, 
__s=0x637d20 "", __n=8192)
at /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/bits/istream.tcc:653
#2  0x00007ffff6b10030 in google::protobuf::io::IstreamInputStream::CopyingIstreamInputStream::Read(void*, int) () from /usr/lib/libprotobuf.so.9
#3  0x00007ffff6a99fe1 in google::protobuf::io::CopyingInputStreamAdaptor::Next(void const**, int*) () from /usr/lib/libprotobuf.so.9
#4  0x00007ffff6a97950 in google::protobuf::io::CodedInputStream::Refresh() ()
from /usr/lib/libprotobuf.so.9
#5  0x00007ffff6a94da3 in google::protobuf::MessageLite::ParseFromZeroCopyStream(google::protobuf::io::ZeroCopyInputStream*) () from /usr/lib/libprotobuf.so.9
#6  0x00007ffff6af5ad9 in google::protobuf::Message::ParseFromIstream(std::istream*) () from /usr/lib/libprotobuf.so.9
#7  0x0000000000407e35 in main (argc=1, argv=0x7fffffffe6d8)
at /home/roel/source/consumer.cpp:44
(gdb) 

这是在Linux上使用CMake和GCC 6.0.1编译的。关于我的程序,我有很多问题:


Q1.首先是什么原因导致了分段错误
我做错了什么?我已经看了这个代码好几个小时了,但没有发现问题


Q2.在boost::interprocess::message_queue中构造函数,我必须定义2个参数;最大数量消息以及大小。对于标准类型,此尺寸为固定的但是,对于消息(通常),消息的大小是可变的。那么,确定金额的最佳方法是什么要为消息保留的内存不足我应该简单地设置一个最大值吗每个消息的大小并创建一些多部分消息参数?


Q3.有更好的方法来实现我的目标吗序列化数据,把它放进队列似乎很复杂,尤其是看到这可能是一个非常常见的问题。肯定还有更多人们试图创建跨平台IPC。像ZeroMQ这样的库仅支持UNIX域套接字。使用环回的TCP套接字界面看起来很难看。难道没有一个图书馆让我将任意对象(大小和布局)作为消息放入共享内存段,消费者可以使用哪个pop()?我的意思是,在单线程,这可以通过堆栈上的push()pop()来修复。做所有这些额外的步骤似乎是一大笔开销。

提前感谢您的回复。


编辑

正如The Dark所指出的,上面的代码使用了std::string的实例,而不是实际的字符串(std::string.data())

producter.cpp的答案如下:

std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1); 

然而,对于consumer.cpp来说,这并不能正常工作,因为字符串的初始化大小为0。

以下是我用于consumer.cpp的代码:

unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
//Reserve 1000 bytes of memory for our message
char incomingBuffer[1000];
mq.receive(&incomingBuffer, 1000, recvd_size, priority);
ib::protobuf::testMessage recvdMessage;
//Only if string object is really required
std::basic_string<char> str = incomingBuffer;
std::cout << "Message: " << str.data() << ". Size is " << recvd_size << std::endl;
//ParseFromString() can also directly parse "incomingBuffer", avoiding the cast above
recvdMessage.ParseFromString(str.data());
std::cout << "Message ID " << recvdMessage.id() << std::endl;
std::cout << recvdMessage.DebugString();

这部分生产者似乎错了。

// Send our message
std::ofstream buftosend;
myMessage.SerializeToOstream(&buftosend);
mq.send(&buftosend, sizeof(buftosend), 1);

ofstream尚未打开,因此没有存储任何内容的文件,因此第一次调用将失败(而不是崩溃)。send调用正在通过线路发送原始ofstream类结构。这将不是可传输的格式。

我认为您想要的是序列化到一个ostringstream,然后传输该ostringsream的内容(而不是整个对象)。

类似于:

// Send our message
std::ostringstream buftosend;
myMessage.SerializeToOstream(&buftosend);
std::string str = buftosend.str();
mq.send(str.data(), str.size(), 1); 

或者更好:

// Send our message
std::string str = myMessage.SerializeAsString();
mq.send(str.data(), str.size(), 1); 

您也可以添加一个调试行来显示str的内容,不过请注意,它将是二进制的,因此不可读。

您的消费者可能也有类似的问题(ifstream需要在文件中打开)。

A3:

ZeroMQ

ZeroMQ而言,有许多不同的传输类可同时使用。因此,如果希望对本地线程间信令使用最低开销,那么我们使用inproc://传输类,如果使用本地进程间信令,则可以使用ipc://传送类的.bind()/.connect()。对于平台间的分布式处理,tcp://pgm://或者epgm://传输类使您可以根据与系统和网络功能相一致的通信需求轻松进行选择。

(不要犹豫,查看其他帖子,还有一个指向Pieter HINTKENS的书的直接URL,这是进入分布式系统设计的必读)


nanomsg

另一个聪明的&轻量级的无代理消息/信令框架nanomsg来自ZeroMQ的共同创始人MartinSUSTRIK。同样,INPROCIPCTCP,传输类已准备就绪。绝对值得花几分钟时间来阅读他对这个主题的深刻见解


然而,使用这些框架依赖于某种对象表示,这是因为合理的假设,即消息传递代理不知道什么系统位于消息传递/信令套接字的远程端,无论是串行化、容器化对象、对象包装器,简单地说,一个人有责任将自己想要的对象"准备"成可运输的方式,用于调度和远程恢复。

正如所要求的,使用通用共享内存设计可以进入其他体系结构,而ZeroMQinproc://nanomsgINPROC传输类几乎是零拷贝零延迟的示例。