ZMQ_CONFLATE不适用于ZMQ_SUB(无过滤器)

ZMQ_CONFLATE does not work for ZMQ_SUB (no filters)

本文关键字:ZMQ 过滤器 适用于 CONFLATE 不适用 SUB      更新时间:2023-10-16

我在实时快速的服务器和慢速的客户端上安装了zeromq-4.1.4库和cppzmq。

客户端和服务器端都有2个端口用于发布和订阅,通过TCP-IP通信。

服务器以自己的速度发送消息。客户端接收到最新的消息,做一些缓慢的计算,然后将消息发送回服务器。如果有传入消息,服务器将读取消息并处理它。

问题是旧消息没有被新消息覆盖。客户端总是打印出较旧的消息,即使我关闭了服务器,消息也会继续从客户端的接收缓冲区排队。

为什么会发生?设置ZMQ_CONFLATE。难道它不能正常工作吗?

作为一种解决方案,我认为将客户端放在工作线程中以最大速率工作,然后手动保留最后一条消息。但这是一个开销,因为据我所知,这正是zeromq在发送或接收消息时所做的。

客户端/服务器代码相同:

void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags)
{
  flags_ = flags;
  int confl = 1;
  // Prepare our context
  context_ = new zmq::context_t(1);
  // Prepare ZMQ publisher
  publisher_ = new zmq::socket_t(*context_, ZMQ_PUB);
  publisher_->bind(pubAddress);
  publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
  // Prepare ZMQ subscriber
  subscriber_ = new zmq::socket_t(*this->context_, ZMQ_SUB);
  subscriber_->connect(subAddress);
  subscriber_->setsockopt(ZMQ_SUBSCRIBE, "", 0);
  subscriber_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
  if (flags_ & ZMQ_SYNC_PUB)
  {
    syncService_ = new zmq::socket_t(*context_, ZMQ_REP);
    syncService_->bind(syncAddress);
  }
  if (flags_ & ZMQ_SYNC_SUB)
  {
    // synchronize with publisher
    syncService_ = new zmq::socket_t(*context_, ZMQ_REQ);
    syncService_->connect(syncAddress);
    // - send a synchronization request
    zmq::message_t message(0);
    syncService_->send(message);
    // - wait for synchronization reply
    zmq::message_t update;
    syncService_->recv(&update);
  }
}
void ZeromqMessenger::sync()
{
  if (connected_)
    return;
  if (flags_ & ZMQ_SYNC_PUB)
  {
    //std::cout << "Waiting for subscribers" << std::endl;
    if (subscribers_ < subscribers_expected_)
    {
      // - wait for synchronization request
      zmq::message_t update;
      if (syncService_->recv(&update, ZMQ_DONTWAIT))
      {
        // - send synchronization reply
        zmq::message_t message(0);
        syncService_->send(message);
        subscribers_++;
      }
    }
    if (subscribers_ == subscribers_expected_)
      connected_ = true;
  }
}
void ZeromqMessenger::send(const void* data, int size) const
{
  zmq::message_t message(size);
  memcpy(message.data(), data, size);
  publisher_->send(message);
}
bool ZeromqMessenger::recv(void *data, int size, int flags) const
{
  zmq::message_t update;
  bool received = subscriber_->recv(&update, flags);
  if(received)
    memcpy(data, update.data(), size);
  return received;
}

我实现了线程版本,它工作得很好。这是一个使用全局变量的非常粗糙的实现,还需要改进,但至少它可以工作。

#include <zmq_messenger.h>
#include <iostream>
#include <thread>
#include <mutex>
std::string gSubAddress;
std::mutex gMtx;
const int gSize = 20*sizeof(double);
char gData[gSize];
void *worker_routine (void *context)
{
  // Prepare ZMQ subscriber
  int confl = 1;
  zmq::socket_t* subscriber = new zmq::socket_t(*(zmq::context_t*)context, ZMQ_SUB);
  subscriber->connect(gSubAddress.c_str());
  subscriber->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
  subscriber->setsockopt(ZMQ_SUBSCRIBE, "", 0);
  while (1)
  {
    zmq::message_t update;
    bool received = subscriber->recv(&update, ZMQ_DONTWAIT);
    if(received)
    {
      gMtx.lock();
      memcpy(gData, update.data(), gSize);
      gMtx.unlock();
    }
  }
  zmq_close(subscriber);
  return NULL;
}
void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags)
{
  flags_ = flags;
  int confl = 1;
  // Prepare our context
  context_ = new zmq::context_t(1);
  // Prepare ZMQ publisher
  publisher_ = new zmq::socket_t(*context_, ZMQ_PUB);
  publisher_->bind(pubAddress);
  publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
  gSubAddress = std::string(subAddress);
  pthread_create (&subscriber_worker_, NULL, worker_routine, context_);
  if (flags_ & ZMQ_SYNC_PUB)
  {
    syncService_ = new zmq::socket_t(*context_, ZMQ_REP);
    syncService_->bind(syncAddress);
  }
  if (flags_ & ZMQ_SYNC_SUB)
  {
    //std::cout << "Trying to connect" << std::endl;
    // synchronize with publisher
    syncService_ = new zmq::socket_t(*context_, ZMQ_REQ);
    syncService_->connect(syncAddress);
    // - send a synchronization request
    zmq::message_t message(0);
    syncService_->send(message);
    // - wait for synchronization reply
    zmq::message_t update;
    syncService_->recv(&update);
    // Third, get our updates and report how many we got
    //std::cout << "Ready to receive" << std::endl;
  }
}
void ZeromqMessenger::sync()
{
  //std::cout << "sync" << std::endl;
  if (connected_)
    return;
  if (flags_ & ZMQ_SYNC_PUB)
  {
    //std::cout << "Waiting for subscribers" << std::endl;
    if (subscribers_ < subscribers_expected_)
    {
      // - wait for synchronization request
      zmq::message_t update;
      if (syncService_->recv(&update, ZMQ_DONTWAIT))
      {
        // - send synchronization reply
        zmq::message_t message(0);
        syncService_->send(message);
        subscribers_++;
      }
    }
    if (subscribers_ == subscribers_expected_)
      connected_ = true;
    //std::cout << subscribers_ << " subscriber(s) connected" << std::endl;
  }
}
void ZeromqMessenger::send(const void* data, int size) const
{
  zmq::message_t message(size);
  memcpy(message.data(), data, size);
  publisher_->send(message);
}
bool ZeromqMessenger::recv(void *data, int size, int flags) const
{
  assert(gSize == size);
  gMtx.lock();
  memcpy(data, gData, size);
  gMtx.unlock();
  return true;
}