使用 MsgPack 通过 ZeroMQ (zmqpp) 发送数据时会出现"msgpack::v1::insufficient_bytes"错误

Sending data through ZeroMQ (zmqpp) using MsgPack gives 'msgpack::v1::insufficient_bytes' error

本文关键字:msgpack v1 错误 bytes insufficient ZeroMQ 通过 MsgPack zmqpp 数据 使用      更新时间:2023-10-16

我使用zmqpp建立了一个PUB/SUB连接,现在我只想使用头,即msgpack-C的C++11版本,将数据从发布服务器发送到订阅服务器。

发布者必须发送两个int64_t号码——header_1header_2——然后是std::vector<T>-data-,其中T(header_1, header_2)组合确定。

由于没有太多关于如何组合msgpack和zmqpp的例子,所以我想到的想法是使用zmqpp::message::add/add_raw发送一条三部分消息。每个部件都将使用msgpack进行打包/解包。

发布者将单个数据部分打包如下:

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add(buffer.data(), buffer.size());

接收器像这样打开包装:

zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
msgpack::unpacked unpackedData;
// crash !
msgpack::unpack(unpackedData,
                static_cast<const char*>(msg.raw_data(0)),
                msg.size(0));
unpackedData.get().convert(&header_1);

当我运行代码时,我在用户端收到以下错误:

terminate called after throwing an instance of 'msgpack::v1::insufficient_bytes'
  what():  insufficient bytes
Aborted

此外,尽管我只调用了add() 3次,但zmqpp似乎已经生成了一条由5部分组成的消息。

Q1:我是否正确打包/拆包了数据

Q2:这是使用zmqpp发送消息包缓冲区的正确方法吗

以下是代码的重要部分:

发布服务器

zmqpp::socket publisherSock;
/* connection setup stuff ...*/
// forever send data to the subscribers
while(true)
{
    zmqpp::message msg;
    // meta info about the data
    int64_t header_1 = 1234567;
    int64_t header_2 = 89;
    // sample data
    std::vector<double> data;
    data.push_back(1.2);
    data.push_back(3.4);
    data.push_back(5.6);

    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_1);
        msg.add(buffer.data(), buffer.size());
        cout << "header_1:" << header_1 << endl;  // header_1:1234567
    }
    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, header_2);
        msg.add(buffer.data(), buffer.size());
        cout << "header_2:" << header_2 << endl;  // header_2:89
    }
    {
        msgpack::sbuffer buffer;
        msgpack::pack(buffer, data);
        msg.add_raw(buffer.data(), buffer.size());
        std::cout << "data: " << data << std::endl;  // data:[1.2 3.4 5.6]
    }
    std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"... why ?
    publisherSock.send(msg);
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

订阅者

zmqpp::socket subscriberSock;
/* connection setup stuff ...*/
zmqpp::message msg;
subscriberSock.receive(msg);
int64_t header_1;
int64_t header_2;
std::vector<double> data;
std::cout << msg.parts() << " parts" << std::endl;  // prints "5 parts"
{
    // header 1
    {
        msgpack::unpacked unpackedData;
        // crash !
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(0)),
                        msg.size(0));
        unpackedData.get().convert(&header_1);
        cout << "header_1:" << header_1 << endl;
    }
    // header 2
    {
        msgpack::unpacked unpackedData;
        msgpack::unpack(unpackedData,
                        static_cast<const char*>(msg.raw_data(1)),
                        msg.size(1));
        unpackedData.get().convert(&header_2);
        cout << "header_2:" << header_2 << endl;
    }
    // data
    {
        msgpack::unpacked unpacked_data;
        msgpack::unpack(unpacked_data,
                        static_cast<const char*>(msg.raw_data(2)),
                        msg.size(2));
        unpacked_data.get().convert(&data);
        std::cout << "data:" << data << std::endl;
    }
}

编辑:解决的问题:正如@Jens所指出的,打包/发送数据的正确方式是使用zmqpp::message::add_raw()

zmqpp::message msg;
int64_t header_1 = 1234567;
msgpack::sbuffer buffer;
msgpack::pack(buffer, header_1);
msg.add_raw(buffer.data(), buffer.size());

我认为对msg.add(buffer.data(), buffer.size()的调用不会添加buffer.size()字节的数组,而是调用message::add(Type const& part, Args &&...args)

  1. msg << buffer.data(),它可能调用message::operator<<(bool),因为指针转换为布尔
  2. add(buffer.size()),然后调用msg << buffer.size(),后者添加size_t值作为下一部分

看看zmqpp::message类,使用message::add_raw就可以了。

附言:这一切都没有任何保证,因为我从来没有使用过zmqpp或msgpack。