ZMQ_CONFLATE不适用于ZMQ_SUB(无过滤器)
ZMQ_CONFLATE does not work for ZMQ_SUB (no filters)
我在实时快速的服务器和慢速的客户端上安装了zeromq-4.1.4库和cppzmq。
客户端和服务器端都有2个端口用于发布和订阅,通过TCP-IP通信。
服务器以自己的速度发送消息。客户端接收到最新的消息,做一些缓慢的计算,然后将消息发送回服务器。如果有传入消息,服务器将读取消息并处理它。
问题是旧消息没有被新消息覆盖。客户端总是打印出较旧的消息,即使我关闭了服务器,消息也会继续从客户端的接收缓冲区排队。
为什么会发生?设置ZMQ_CONFLATE。难道它不能正常工作吗?
作为一种解决方案,我认为将客户端放在工作线程中以最大速率工作,然后手动保留最后一条消息。但这是一个开销,因为据我所知,这正是zeromq在发送或接收消息时所做的。
客户端/服务器代码相同:
void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags)
{
flags_ = flags;
int confl = 1;
// Prepare our context
context_ = new zmq::context_t(1);
// Prepare ZMQ publisher
publisher_ = new zmq::socket_t(*context_, ZMQ_PUB);
publisher_->bind(pubAddress);
publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
// Prepare ZMQ subscriber
subscriber_ = new zmq::socket_t(*this->context_, ZMQ_SUB);
subscriber_->connect(subAddress);
subscriber_->setsockopt(ZMQ_SUBSCRIBE, "", 0);
subscriber_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
if (flags_ & ZMQ_SYNC_PUB)
{
syncService_ = new zmq::socket_t(*context_, ZMQ_REP);
syncService_->bind(syncAddress);
}
if (flags_ & ZMQ_SYNC_SUB)
{
// synchronize with publisher
syncService_ = new zmq::socket_t(*context_, ZMQ_REQ);
syncService_->connect(syncAddress);
// - send a synchronization request
zmq::message_t message(0);
syncService_->send(message);
// - wait for synchronization reply
zmq::message_t update;
syncService_->recv(&update);
}
}
void ZeromqMessenger::sync()
{
if (connected_)
return;
if (flags_ & ZMQ_SYNC_PUB)
{
//std::cout << "Waiting for subscribers" << std::endl;
if (subscribers_ < subscribers_expected_)
{
// - wait for synchronization request
zmq::message_t update;
if (syncService_->recv(&update, ZMQ_DONTWAIT))
{
// - send synchronization reply
zmq::message_t message(0);
syncService_->send(message);
subscribers_++;
}
}
if (subscribers_ == subscribers_expected_)
connected_ = true;
}
}
void ZeromqMessenger::send(const void* data, int size) const
{
zmq::message_t message(size);
memcpy(message.data(), data, size);
publisher_->send(message);
}
bool ZeromqMessenger::recv(void *data, int size, int flags) const
{
zmq::message_t update;
bool received = subscriber_->recv(&update, flags);
if(received)
memcpy(data, update.data(), size);
return received;
}
我实现了线程版本,它工作得很好。这是一个使用全局变量的非常粗糙的实现,还需要改进,但至少它可以工作。
#include <zmq_messenger.h>
#include <iostream>
#include <thread>
#include <mutex>
std::string gSubAddress;
std::mutex gMtx;
const int gSize = 20*sizeof(double);
char gData[gSize];
void *worker_routine (void *context)
{
// Prepare ZMQ subscriber
int confl = 1;
zmq::socket_t* subscriber = new zmq::socket_t(*(zmq::context_t*)context, ZMQ_SUB);
subscriber->connect(gSubAddress.c_str());
subscriber->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
subscriber->setsockopt(ZMQ_SUBSCRIBE, "", 0);
while (1)
{
zmq::message_t update;
bool received = subscriber->recv(&update, ZMQ_DONTWAIT);
if(received)
{
gMtx.lock();
memcpy(gData, update.data(), gSize);
gMtx.unlock();
}
}
zmq_close(subscriber);
return NULL;
}
void ZeromqMessenger::init(const char* pubAddress, const char* subAddress, const char* syncAddress, int flags)
{
flags_ = flags;
int confl = 1;
// Prepare our context
context_ = new zmq::context_t(1);
// Prepare ZMQ publisher
publisher_ = new zmq::socket_t(*context_, ZMQ_PUB);
publisher_->bind(pubAddress);
publisher_->setsockopt(ZMQ_CONFLATE, &confl, sizeof(confl)); // Keep only last message
gSubAddress = std::string(subAddress);
pthread_create (&subscriber_worker_, NULL, worker_routine, context_);
if (flags_ & ZMQ_SYNC_PUB)
{
syncService_ = new zmq::socket_t(*context_, ZMQ_REP);
syncService_->bind(syncAddress);
}
if (flags_ & ZMQ_SYNC_SUB)
{
//std::cout << "Trying to connect" << std::endl;
// synchronize with publisher
syncService_ = new zmq::socket_t(*context_, ZMQ_REQ);
syncService_->connect(syncAddress);
// - send a synchronization request
zmq::message_t message(0);
syncService_->send(message);
// - wait for synchronization reply
zmq::message_t update;
syncService_->recv(&update);
// Third, get our updates and report how many we got
//std::cout << "Ready to receive" << std::endl;
}
}
void ZeromqMessenger::sync()
{
//std::cout << "sync" << std::endl;
if (connected_)
return;
if (flags_ & ZMQ_SYNC_PUB)
{
//std::cout << "Waiting for subscribers" << std::endl;
if (subscribers_ < subscribers_expected_)
{
// - wait for synchronization request
zmq::message_t update;
if (syncService_->recv(&update, ZMQ_DONTWAIT))
{
// - send synchronization reply
zmq::message_t message(0);
syncService_->send(message);
subscribers_++;
}
}
if (subscribers_ == subscribers_expected_)
connected_ = true;
//std::cout << subscribers_ << " subscriber(s) connected" << std::endl;
}
}
void ZeromqMessenger::send(const void* data, int size) const
{
zmq::message_t message(size);
memcpy(message.data(), data, size);
publisher_->send(message);
}
bool ZeromqMessenger::recv(void *data, int size, int flags) const
{
assert(gSize == size);
gMtx.lock();
memcpy(data, gData, size);
gMtx.unlock();
return true;
}
相关文章:
- 带过滤器的现代迭代c++集合
- 如何将字符串添加到布谷鸟过滤器?
- 在 capnp FlatArrayMessageReader 的对齐内存缓冲区中接收 zmq 消息
- 在事件过滤器之前发出对象的事件
- C++对开销较少的容器使用多个过滤器
- 无法在 Linux 上运行C++ ZMQ 项目的可执行文件
- CBasePin 递增对拥有过滤器的引用.循环引用?
- ZMQ::send() 抛出异常并终止 QNX 进程.为什么以及如何从中恢复?
- Qt:事件过滤器在显示 QCompleter 时不接收按键
- ZMQ - 客户端服务器:客户端意外关闭,服务器如何检测到?
- 使用 Vivek 的 Vcam / 捕获源过滤器构建/链接错误
- 为什么 zmq 将多条消息打包到一个 TCP 帧中?
- Qt 中的去抖动事件过滤器
- QT 事件过滤器 mouseEvent->pos().x() 始终为零,但 mouseEvent->pos().y() 工作正常
- 如何使用C++获取/设置OBS中的垂直滚动过滤器属性?
- 当通过tcp接收编码图像的消息时,zmq在第二个循环中崩溃
- ZeroMQ (cppzmq) 订阅者,过滤器以相同的字符串开头
- 有没有办法查询邮件过滤器是否已经生效?
- 使用指针使用过滤器重写字符数组
- 如何在ZMQ代理中记录接收消息?