ZeroMQ 中的多个发布者 C++ 这是一个不错的选择吗?

Multiple publisher in ZeroMQ C++ Is this a good choice or not?

本文关键字:一个 选择 发布者 C++ ZeroMQ      更新时间:2023-10-16

我是ZeroMQ的新手。我想创建多个发布者,其中每个发布者发布特定数据,如下所示:

发布
  1. 者 1:发布图像数据
  2. 发布
  3. 服务器 2:发布音频数据
  4. 发布
  5. 者 3:发布文本数据

基本上,我的要求是发布来自多个发布者的数据,并使用另一端的多个接收器接收。

请参阅下面的示例代码:

data_publisher.cpp

//  Prepare our context and all publishers
zmq::context_t context(1);
zmq::socket_t publisher1(context, ZMQ_PUB);
zmq::socket_t publisher2(context, ZMQ_PUB);
zmq::socket_t publisher3(context, ZMQ_PUB);
zmq::socket_t publisher4(context, ZMQ_PUB);
publisher1.bind("tcp://*:5556");
publisher2.bind("tcp://*:5557");
publisher3.bind("tcp://*:5558");
publisher4.bind("tcp://*:5559");
//  Initialize random number generator
srandom((unsigned)time(NULL));
while (1) {
// sample data
int zipcode1 = within(100000);
int zipcode2 = within(100000);
int zipcode3 = within(100000);
int zipcode4 = within(100000);
int temperature1 = within(215) - 80;
int temperature2 = within(215) - 80;
int temperature3 = within(215) - 80;
int temperature4 = within(215) - 80;
int relhumidity1 = within(50) + 10;
int relhumidity2 = within(50) + 10;
int relhumidity3 = within(50) + 10;
int relhumidity4 = within(50) + 10;
zmq::message_t message1(20);
zmq::message_t message2(20);
zmq::message_t message3(20);
zmq::message_t message4(20);
snprintf((char*)message1.data(), 20, "%05d %d %d", zipcode1, temperature1, relhumidity1);
snprintf((char*)message2.data(), 20, "%05d %d %d", zipcode2, temperature2, relhumidity2);
snprintf((char*)message3.data(), 20, "%05d %d %d", zipcode3, temperature3, relhumidity3);
snprintf((char*)message4.data(), 20, "%05d %d %d", zipcode4, temperature4, relhumidity4);
publisher1.send(message1);
publisher2.send(message2);
publisher3.send(message3);
publisher4.send(message4);
}

data_receiver.cpp

zmq::context_t context(1);
//  Socket to talk to server
zmq::socket_t subscriber1(context, ZMQ_SUB);
zmq::socket_t subscriber2(context, ZMQ_SUB);
zmq::socket_t subscriber3(context, ZMQ_SUB);
zmq::socket_t subscriber4(context, ZMQ_SUB);
subscriber1.connect("tcp://localhost:5556");
subscriber2.connect("tcp://localhost:5557");
subscriber3.connect("tcp://localhost:5558");
subscriber4.connect("tcp://localhost:5559");
const char* filter = (argc > 1) ? argv[1] : "10001 ";
subscriber1.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber2.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber3.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
subscriber4.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));
//  Process 100 updates
int update_nbr;
long total_temp1 = 0;
long total_temp2 = 0;
long total_temp3 = 0;
long total_temp4 = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++)
{
zmq::message_t update1;
zmq::message_t update2;
zmq::message_t update3;
zmq::message_t update4;
int zipcode1, temperature1, relhumidity1;
int zipcode2, temperature2, relhumidity2;
int zipcode3, temperature3, relhumidity3;
int zipcode4, temperature4, relhumidity4;
subscriber1.recv(&update1);
subscriber2.recv(&update2);
subscriber3.recv(&update3);
subscriber4.recv(&update4);
std::istringstream iss1(static_cast<char*>(update1.data()));
std::istringstream iss2(static_cast<char*>(update2.data()));
std::istringstream iss3(static_cast<char*>(update3.data()));
std::istringstream iss4(static_cast<char*>(update4.data()));
iss1 >> zipcode1 >> temperature1 >> relhumidity1;
iss2 >> zipcode2 >> temperature2 >> relhumidity2;
iss3 >> zipcode3 >> temperature3 >> relhumidity3;
iss4 >> zipcode4 >> temperature4 >> relhumidity4;
total_temp1 += temperature1;
total_temp2 += temperature2;
total_temp3 += temperature3;
total_temp4 += temperature4;
}
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp1 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp2 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp3 / update_nbr) << "F" << std::endl;
std::cout << "Average temperature for zipcode '" << filter << "' was "
<< (int)(total_temp4 / update_nbr) << "F" << std::endl;

请注意,上面的代码是获取建议/建议的示例代码。

我想知道它是否是一个不错的选择,如上面的示例代码所示?

仍在等待任何定量事实,但让我们开始:

ZeroMQ是一个使用智能启用工具的概念,而低级系统编程被ZeroMQ核心元素Context引擎隐藏。

也就是说,高级工具,形式化为可扩展的正式通信模式原型,提供了某种模仿人类的行为 -PUB发布者确实"发布",SUB订阅者可以"订阅",REQ请求者可以"请求",REP回复者确实可以"回复",等等

这些具有行为的接入点一旦被授予一些基本规则,就可以.bind()/.connect()到某种分布式行为基础设施中。其中一条规则是不要打扰实际的传输类,所有这些都确实是功能丰富的技术,目前跨越{ inproc:// | ipc:// | tcp:// | pgm:// | epgm:// | vmci:// }、低级细节的景观,Context()实例将处理所有这些 对您的高层行为透明。干脆忘了这个。另一条规则是,您可以确保发送的每条消息要么没有错误,要么根本没有 - 在这方面没有妥协,没有传递折磨人的垃圾来欺骗或破坏收件人的 AccessPoint 后处理。

由于无法理解这一点,ZeroMQ不习惯为我们提供这种豪华工具所设计的舒适性和功能。

回到你的困境:

说了上面的几句话,并且您的主要架构还不清楚,仍然可以在这里为您提供帮助。

ZeroMQ抽象的分布式行为套接字工具主要是一个纯[SERIAL]调度设备。这意味着,与套接字关联的任何接收接入点{ .bind() | .connect() }都不能期望对纯顺序消息流进行任意重新排序。

这意味着,在任何情况下,无论是"公正">[CONCURRENT]流程调度,还是在极端情况下,在技术上编排了真正[PARALLEL]流程调度,单个"纯">[SERIAL]交付通道将不允许{ [CONCURRENT] | [PARALLEL] }系统继续提供这种流程调度模式,并将事件/处理流切成"纯">[SERIAL]消息序列。

A )这确实可能是引入多个独立操作的 ZeroMQ 分布式行为套接字实例的原因和必要条件。

B)另一方面,由于对全球分布式系统行为一无所知,没有人能确定,进入多个独立操作的 Socket 实例是否不仅仅是浪费时间和资源,由于极其错误或完全缺失的初始工程决策,提供不合理的低于平均水平或令人无法接受的差端到端系统行为性能。


性能?

不要在这个领域猜测,永远不要。而是从首先定量声明的需求开始,在此基础上,技术上合理的设计将能够继续进行并定义资源映射和性能调整到平台限制所需的所有步骤。

在过去的二十年里,ZeroMQ已经具备了出色的性能特征,设计和工程团队在完善可扩展性和性能方面做了很多工作,同时将延迟保持在临时程序员难以实现的水平上。确实是隐藏在ZeroMQ地下室的一个伟大的系统编程。

">数据是巨大的" - 好的,定义大小 - 传递大小1 [B]1E+9消息与传递大小1.000.000 [B]的1E + 3消息相比,还有其他性能调整。

">尽可能快" -- 好的,为给定的消息大小和预期节奏定义fast1/s ~1 [Hz], 10/s ~10 [Hz], 1000/s ~1 [kHz]

当然,在某些情况下,这种需求组合可能会超出当代计算设备功能范围。在任何编程开始之前,都必须对其进行最佳审查,因为否则您只是在一件永远不会飞行的事情上破坏了一些编程工作,所以最好有一个积极的证明,证明解决方案架构在可接受的资源和成本范围内是可行的。

因此,如果您的项目需要某些内容,请首先定义并定量指定实际是什么,接下来解决方案体系结构可以开始对其进行整理并提供决策,哪些工具和哪些工具配置可以匹配定义的功能和性能目标级别。

建造房屋,从抬高屋顶开始,永远不会回答如何布置地下室墙壁的问题,这将是足够但尚未过度设计的铁混凝土装甲厚度,这将承载未知数量的高架建筑地板。屋顶已经建成很容易出现,但与系统和严格的设计和工程实践无关。