ZeroMQ PUB/SUL 绑定订阅者
ZeroMQ PUB/SUB bind subscriber
我正在和自己一起学习ZeroMQ。
我将PUB作为服务器(绑定),SUB作为客户端(连接)进行测试并且工作正常。相反(PUB作为客户端(连接),SUB作为服务器(绑定))也可以正常工作。当我将另一个 SUB 套接字作为客户端连接时,出现问题而没有任何异常或错误。
这是我的示例代码。
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
#include <thread>
class ZMQSock
{
public:
ZMQSock(const char* addr)
{
if (addr != NULL)
{
mctx = new zmq::context_t(1);
mszAddr = new char[strlen(addr) + 1];
snprintf(mszAddr, strlen(addr) + 1, "%s", addr);
}
}
virtual ~ZMQSock()
{
if (msock != nullptr)
delete msock;
if (mctx != nullptr)
delete mctx;
if (mszAddr != nullptr)
delete [] mszAddr;
}
int zbind()
{
if (msock != nullptr)
msock->bind(mszAddr);
else return -1;
return 0;
}
int zconnect()
{
if (msock != nullptr)
msock->connect(mszAddr);
else return -1;
return 0;
}
void start()
{
if (mbthread != false)
return ;
mbthread = true;
mhthread = std::thread(std::bind(&ZMQSock::run, this));
}
virtual void stop()
{
if (mbthread == false)
return ;
mbthread = false;
if (mhthread.joinable())
mhthread.join();
}
virtual void run() = 0;
protected:
char* mszAddr{nullptr};
zmq::context_t* mctx{nullptr};
zmq::socket_t* msock{nullptr};
bool mbthread{false};
std::thread mhthread;
};
class ZPublisher : public ZMQSock
{
public:
ZPublisher(const char* addr) : ZMQSock(addr)
{
if (msock == nullptr)
{
msock = new zmq::socket_t(*mctx, ZMQ_PUB);
}
}
virtual ~ZPublisher()
{
}
bool zsend(const char* data, const unsigned int length, bool sendmore=false)
{
zmq::message_t msg(length);
memcpy(msg.data(), data, length);
if (sendmore)
return msock->send(msg, ZMQ_SNDMORE);
return msock->send(msg);
}
void run()
{
if (mszAddr == nullptr)
return ;
if (strlen(mszAddr) < 6)
return ;
const char* fdelim = "1";
const char* first = "it sends to first. two can not recv this sentence! ";
const char* sdelim = "2";
const char* second = "it sends to second. one can not recv this sentence! ";
while (mbthread)
{
zsend(fdelim, 1, true);
zsend(first, strlen(first));
zsend(sdelim, 1, true);
zsend(second, strlen(second));
usleep(1000 * 1000);
}
}
};
class ZSubscriber : public ZMQSock
{
public:
ZSubscriber(const char* addr) : ZMQSock(addr)
{
if (msock == nullptr)
{
msock = new zmq::socket_t(*mctx, ZMQ_SUB);
}
}
virtual ~ZSubscriber()
{
}
void setScriberDelim(const char* delim, const int length)
{
msock->setsockopt(ZMQ_SUBSCRIBE, delim, length);
mdelim = std::string(delim, length);
}
std::string zrecv()
{
zmq::message_t msg;
msock->recv(&msg);
return std::string(static_cast<char*>(msg.data()), msg.size());
}
void run()
{
if (mszAddr == nullptr)
return ;
if (strlen(mszAddr) < 6)
return ;
while (mbthread)
{
std::cout << "MY DELIM IS [" << mdelim << "] - MSG : ";
std::cout << zrecv() << std::endl;
usleep(1000 * 1000);
}
}
private:
std::string mdelim;
};
int main ()
{
ZPublisher pub("tcp://localhost:5252");
ZSubscriber sub1("tcp://localhost:5252");
ZSubscriber sub2("tcp://*:5252");
pub.zconnect();
sub1.zconnect();
sub2.zbind();
sub1.setScriberDelim("1", 1);
sub2.setScriberDelim("2", 1);
pub.start();
std::cout << "PUB Server has been started.." << std::endl;
usleep(1000 * 1000);
sub1.start();
std::cout << "SUB1 Start." << std::endl;
sub2.start();
std::cout << "SUB2 Start." << std::endl;
int i = 0;
std::cout << "< Press any key to exit program. >" << std::endl;
std::cin >> i;
std::cout << "SUB1 STOP START" << std::endl;
sub1.stop();
std::cout << "SUB2 STOP START" << std::endl;
sub2.stop();
std::cout << "PUB STOP START" << std::endl;
pub.stop();
std::cout << "ALL DONE" << std::endl;
return 0;
}
是什么原因造成的?还是我非法使用 PUB/SUB?
您正在将 SUB 套接字连接到 SUB 套接字,这是一个无效的连接。在您的情况下,PUB 应该绑定,而 SUB 应该连接。
相关文章:
- 我想将一个对T类型的非常量左值引用绑定到一个T类型的临时值
- 在基于范围的for循环中使用结构化绑定声明
- 使用 LuaBridge 将 LuaJIT 绑定到C++会导致"PANIC: unprotected error"
- 尝试通过OCI例程从Oracle获取blob数据,但出现错误:ORA-01008:并非所有变量都绑定
- 在使用GPU支持编译Tensorflow时,会遇到CUDA_TOOLKIT_PATH未绑定变量
- 视觉studo 2019中的漫画和静态/动态绑定
- 将自由函数绑定为类成员函数
- 将常量指针引用绑定到非常量指针
- 在派生类中绑定非静态模板化成员函数
- 绑定派生类方法C++从实例范围之外的分隔 std::function 变量调用
- 在 openGL 中多次绑定缓冲区
- 定义有趣的宏和正则表达式在Z3 C++绑定
- 使用结构化绑定'Reflection'
- 为什么 std::绑定错误参数可以成功?
- 如何绑定 C++ gRPC 客户端的网络接口
- 在 openmp 中,omp_get_thread_num是否绑定到物理线程?
- C++绑定(已弃用)
- 运行时错误:引用绑定到类型为"int"的空指针
- 有没有办法将重载的类函数绑定到函数对象?
- ZeroMQ PUB/SUL 绑定订阅者