Buffer上次收到Zeromq消息作为类成员

Buffer last received ZeroMQ message as class member

本文关键字:成员 消息 Zeromq Buffer      更新时间:2023-10-16

我正在尝试编写一个订阅通过Zeromq发布的消息并缓冲最后一个接收的消息。

我尝试这样做如下。该方法ReceivedMessage()应在称为函数的循环中通过包装应用程序调用。返回真实后,我尝试使用GetReceivedMessageData()访问消息。不幸的是,似乎数据未正确保存在成员zmq_receivedMessage_中。

我想这是因为zmq_receivedMessage_用固定尺寸初始化,并且调用zmq_subscriber_.recv(&zmq_receivedMessage_)不会自动调整它?

最简单,最坚固的方法是什么?我能想到的唯一方法是每次收到新消息时使用realloc()memcpy()。还是有更简单的方法?

#include <cstdint>
#include "zeromq_cpp/zmq.hpp"
class HandlerClass
{
public:
    /// @brief Initializes a AirSimToRos class instance.
    HandlerClass(std::string const& addr);
    // @brief Gets the message data received via ZeroMq as pointer.
    void* GetReceivedMessageData();
    // @brief Gets the message size received via ZeroMq as size_t.
    std::size_t GetReceivedMessageSize();
    // @brief Returns true if a new, full message was received via ZeroMq, false otherwise
    bool ReceivedMessage();
private:    
    /// @brief A ZeroMq context object encapsulating functionality dealing with the initialisation and termination.
    zmq::context_t zmq_context_;
    /// @brief A ZeroMq socket for subscribing to incoming messages.
    zmq::socket_t zmq_subscriber_;
    /// @brief A ZeroMq message that was received last. Might be empty if ReceivedMessage() never was true.
    zmq::message_t zmq_receivedMessage_; 
};
HandlerClass::HandlerClass(std::string const& addr)
    : zmq_context_(1)
    , zmq_subscriber_(zmq_context_, ZMQ_SUB)
{
    zmq_subscriber_.setsockopt(ZMQ_IDENTITY, "HandlerSubscriber", 5);
    zmq_subscriber_.setsockopt(ZMQ_SUBSCRIBE, "", 0);
    zmq_subscriber_.setsockopt(ZMQ_RCVTIMEO, 5000);
    zmq_subscriber_.connect(addr);
}
void* HandlerClass::GetReceivedMessageData()
{
    return zmq_receivedMessage_.data();
}
std::size_t HandlerClass::GetReceivedMessageSize()
{
    return zmq_receivedMessage_.size();
}
bool HandlerClass::ReceivedMessage()
{    
    int received_bytes = zmq_subscriber_.recv(&zmq_receivedMessage_);
    return received_bytes > 0;
}

一种方法是重新设计W/ Poller -instance ZMQ_CONFLATE

具有预期类用例的零上下文,原始设计似乎是数据移动的"机械"包装器,而不是任何MVP-SLIM设计,可以最大程度地挤压Zeromq可扩展的好处正式的通信原型信号/消息传递框架已经内置。

更聪明的(以及ZMQ_RCV_HWM -SAFER(超出了本主题的范围))将不只是总是机械地从Zeromq Context-控制范围内机械读取每个消息,除非实际需要重新读取从 HandlerClass传输此类数据。

添加 Poller private实例,该实例将允许使用.poll() -Method来测试新消息到达(也有一个真实的(也有一个真实) - 时间/事件处理循环稳定控制工具,不要等待更长的时间比临时设置.poll() -Method超时),但仍能够尽可能迟迟推迟任何实际数据移动,直到数据确实需要外部流出 HandlerClass -instance,不早于任何地方。

HandlerClass::HandlerClass(std::string const& addr)
    : zmq_context_(1)
    , zmq_subscriber_(zmq_context_, ZMQ_SUB)
{
    zmq_subscriber_.setsockopt( ZMQ_IDENTITY,   "HandlerSubscriber", 5 );
    zmq_subscriber_.connect(                     addr );
    zmq_subscriber_.setsockopt( ZMQ_SUBSCRIBE,  "", 0 );
    zmq_subscriber_.setsockopt( ZMQ_LINGER,      0 );  // ALWAYS, READY 4 .term()
    zmq_subscriber_.setsockopt( ZMQ_CONFLATE,    1 );  // SMART
    zmq_subscriber_.setsockopt( ZMQ_TOS,         T );  // WORTH DEPLOY & MANAGE
    zmq_subscriber_.setsockopt( ZMQ_RCVTIMEO, 5000 );
 // -------------------------------------------------  // ADD Poller-instance
    ...
 // -------------------------------------------------  // RTO
}

nota bene:如果在Zeromq基础架构上还进行了前gress流,则有节省时间的API工具,用于零填充消息,将其重新填充到另一个Zeromq socket-transport中 - (几乎几乎几乎)免费 - 很酷,不是吗?