具有C++绑定和开放 MP 阻塞问题的零 MQ.为什么
Zero MQ with C++ bindings and Open MP blocking issue. Why?
我为 ZeroMQ 编写了一个测试,以说服自己它设法独立于处理顺序将回复映射到客户端,这将证明它是线程安全的。
它是一个多线程服务器,它只是将收到的消息扔回发送方。客户端从多个线程发送一些消息,并检查是否收到相同的消息。对于多线程,我使用 OpenMP。
该测试运行良好,我想继续使用 ZeroMQ 的C++绑定重新实现它。现在它不再以同样的方式工作了。
以下是 ZMQPP 的代码:
#include <gtest/gtest.h>
#include <zmqpp/zmqpp.hpp>
#include <zmqpp/proxy.hpp>
TEST(zmqomp, order) {
zmqpp::context ctx;
std::thread proxy([&ctx] {
zmqpp::socket dealer(ctx, zmqpp::socket_type::xrequest);
zmqpp::socket router(ctx, zmqpp::socket_type::xreply);
router.bind("tcp://*:1234");
dealer.bind("inproc://workers");
zmqpp::proxy(router, dealer);
});
std::thread worker_starter([&ctx] {
#pragma omp parallel
{
zmqpp::socket in(ctx, zmqpp::socket_type::reply);
in.connect("inproc://workers");
#pragma omp for
for (int i = 0; i < 1000; i++) {
std::string request;
in.receive(request);
in.send(request);
}
}
});
std::thread client([&ctx] {
#pragma omp parallel
{
zmqpp::socket out(ctx, zmqpp::socket_type::request);
out.connect("tcp://localhost:1234");
#pragma omp for
for (int i = 0; i < 1000; i++) {
std::string msg("Request " + std::to_string(i));
out.send(msg);
std::string reply;
out.receive(reply);
EXPECT_EQ(reply, msg);
}
}
});
client.join();
worker_starter.join();
ctx.terminate();
proxy.join();
}
测试会阻止并且不会执行到最后。我稍微玩了一下#pragma
,发现只有一个更改可以"修复"它:
//#pragma omp parallel for
for (int i = 0; i < 250; i++) {
在这种情况下,代码仍在并行执行,但我必须将循环执行数除以我的物理内核数。
有人知道这里发生了什么吗?
序言:ZeroMQ 是按定义和按设计而不是线程安全的。
这通常无关紧要,因为有一些保护设计实践,但一旦遵循建议的TEST(){...}
设计,这里的情况就会更糟。
在与 ZeroMQ 合作了一段时间后,您的提案由于在几个主要问题上的违规而受到冲击,否则这些事情有助于分布式架构比纯粹的单体代码SEQ
更智能地工作。
ZeroMQ在(几乎)每三段中说服避免资源共享。简而言之,零共享是ZeroMQ出色的可扩展性能和最小化延迟的最大化之一。
因此,最好完全避免共享zmq.Context()
实例(除非非常清楚,为什么以及如何在引擎盖下工作)。
因此,尝试并行触发 1000 次(几乎)(好吧,不是真正的PAR
)一些事件流到共享的zmq.Context
实例上(一旦它使用默认参数实例化并且没有性能调整调整,就越少)肯定会遭受与性能和设计方面完全相反的事情, 推荐做。
有哪些限制,不能撞到头?
1)每个zmq.Context()
实例都有有限数量的 I/O 线程,这些线程是在实例化过程中创建的。一旦一个公平的设计需要一些性能调整,就有可能增加这样的I/O线程数量,数据泵将更好地工作(当然,没有多少数据泵可以挽救一个糟糕的,一个灾难性的设计/架构的分布式计算系统。这是理所当然的。).
2)每个zmq.Socket()
实例都有一个 { 隐式 | 显式 } 映射到相应的 I/O 线程 ( 参考文献 1) )。一旦一个公平的设计需要一些增强的鲁棒性,以应对缓慢的事件循环处理或数据流风暴(或负载平衡或你的名字)引起的其他不利影响,就有机会从分而治之的方法中受益,使用.setsockopt( zmq.AFFINITY, ... )
方法将每个zmq.Socket()
实例直接映射到相应的 I/O 线程上,从而在实际操作期间控制哪些缓冲和内部队列正在争夺哪些资源。在任何情况下,当线程总数超过本地主机内核数时,JUST U并发调度是显而易见的(因此真正PAR
执行的梦想主要和无意中丢失了。这是理所当然的。).
3)每个zmq.Socket()
还有一对"隐藏队列破坏者">,称为高水位线。这些设置要么{隐式|显式},后者肯定是性能调优的更明智的方式。为什么是毁灭者?因为这些可以稳定和保护分布式计算系统免受溢出,并且允许简单地丢弃高于HWM
级别的每条消息,以保护系统永久运行的能力,即使在暴风雨、残缺数据包的虚假爆炸或DDoS类型的攻击下也是如此。有许多工具可以调整 ZeroMQ Context() 实例行为的这个领域,这超出了这个答案的范围(参考:我关于 ZeroMQ 的其他帖子AFFINITY
好处或.setsockopt()
方法中使用的 ZeroMQ API 规范)。
4)每个基于传输类tcp://
zmq.Socket()
实例也继承了一些依赖于 O/S 的遗产。一些操作系统通过扩展IP数据包的累积(在任何ZeroMQ控制之外)来证明这种风险,直到超过某个阈值,因此应该对这种情况采取适当的设计小心,以避免对预期的应用程序信令/消息传递动态和鲁棒性产生不利影响,以对抗这种不可控的(外系统)缓冲习惯。
5)每个.recv()
和.send()
方法调用都是按定义阻塞的,这是大规模分布式计算系统永远不应该冒险进入的事情。从来没有。即使在教科书的例子中。而是使用这些调用的非阻塞形式。总是。这是理所当然的。
6)每个zmq.Socket()
实例都应执行一组谨慎而优雅的终止步骤。.setsockopt( zmq.LINGER, 0 )
+ 显式.close()
方法的预防性步骤是公平的,需要包含在每个用例中(并且无论可能出现的任何异常如何,都可以执行。在这种做法中,糟糕的 { self- | team- } 纪律肯定会因为没有对强制性资源管理策略给予应有的关注而挂起整个应用程序基础结构。这是任何严肃的分布式计算项目的必备部分。即使是教科书的例子也应该有这个。没有例外。没有任何借口。这是理所当然的。
- C++我的数学有什么问题,为什么我的代码不能正确循环
- 为什么瓦尔格林德在不释放恶意内存后没有报告任何问题?
- N-queen问题:无法弄清楚为什么我的解决方案不起作用
- Tmax(0x7fffffff 的一个奇怪问题,为什么 (!x) == x?
- 为什么我在代码厨师的 CMPRSS 问题中得到 WA(错误答案)?
- 为什么循环会导致指针出现问题?
- 当问题在性质上完全不同时,为什么要使用 C2259?
- 为什么在分配给成员变量之前获取unique_ptr的返回是一个问题?
- 为什么在这个C++问题中使用const_iterator而不仅仅是迭代器?
- 什么是非营利组织???我的问题是我不明白为什么有人会使用它
- 为什么我的程序在打开网络设备时遇到问题
- 为什么我在虚幻引擎中的多态性和接口方面遇到问题?
- 这个模板为什么有问题?如何正确编译
- 为什么我的类电影在创建电影实例时遇到问题?
- 平衡括号问题为什么要检查它是否为空?
- 我的计划中存在什么问题?为什么无法显示最终输出
- 回文字符串问题:为什么我必须放+1而不是-1才能让这个代码工作
- C多行宏问题:为什么不使用if(1){..}而不是do{..}while(0)在多行宏定义中
- "int"与"double"的问题 - 为什么我的程序不起作用?
- 简单的问题:为什么字符串库函数std::string::find总是返回std::string::npos