每个线程或每个调用一个 ZeroMQ 套接字
One ZeroMQ socket per thread or per call?
众所周知,ZeroMQ 套接字不能在应用程序线程之间共享。
但是context_t
实例可以。
我有一个多线程应用程序,我希望每个线程不时与REQ/REP
套接字对手(事件、异常等)交换消息,具体取决于它们在做什么(它们正在做非 ZeroMQ 的东西)。
要将消息发送到我的REQ/REP
-socket,我使用以下函数
(半C++半伪代码):
sendMessage:
bool sendMessage(std::string s)
{
zmq::socket_t socket(globalContext(), ZMQ_REQ);
socket.connect("ipc://http-concentrator");
zmq::message_t message(s.size());
memcpy(message.data(), s.data(), s.size());
if (!socket.send(message))
return false;
// poll on socket for POLLIN with timeout
socket.recv(&message);
// do something with message
return true;
}
在需要时从每个线程调用此函数。它创建本地套接字、连接、发送消息并接收响应。退出时,插座断开连接并移除(至少我假设它已关闭)。
这样,我就不需要费心在每个线程中维护一个套接字。这是以每次调用此函数时创建和连接为代价的。
我已经强调了这段代码,我没有看到重用一个套接字和这个重新连接实现之间的太大区别。(我每秒有 20k 个REP/REQ
事务,包括用例两侧的 JSON 解码/编码)
问:有没有更正确的 ZeroMQ 方法?
Nota Bene:此答案是在 O/P 在传输类上从 20k TPS 更改为 140k TPS 之前发布的 ipc://
问:有没有更正确的 ZeroMQ 方法?
A:不容易说什么是"这个",什么是"正确性"指标的参数
鉴于此,以下几点将更加通用
,
适用于系统设计阶段推理:
资源利用间接费用避免
这一点是一把双刃剑。一些开销总是与REQ
-接入点到REQ/REP
模式的基础设施元素设置和处置(是的,甚至是关闭和拆除)相关联的,相关的基于套接字的传输类在REQ
端主机和REP
端都带来了一些显着的开销。
你注意到的是公平的,你对在大约20k TPS的水平上对此进行了定量测试,并且没有观察到这种方法的任何不利影响。目前尚不清楚的是,是否还有其他场景也在相同的SUT(被测系统)上进行了体内测试,以便为比较每个各自的设计提供一些基线(并允许确定开销本身的差异)。
虽然一个设计良好的框架将这部分系统内部行为隐藏在用户维护的代码中,但这并不意味着,它都是廉价的,而不是免费的处理。
很明显,在Context()
实例线程(...是的,复数在这里是正确的,因为一些高性能代码可能会受益于每个Context()
实例使用多个 I/O 线程,并通过显式定义模式套接字与其各自的 I/O 线程处理程序之间的亲和力映射来积极影响工作负载分布(以便以某种方式平衡(如果无法确定地平衡)预期的 I/O 吞吐量, 包括所有相关开销)。
如果仍然有疑问,人们应该永远记住,命令式编程风格的函数或面向对象的方法主要是外部调用者的受害者,外部调用者决定在哪个时刻以及何时调用这种">从属"代码执行单元值班并被执行。该函数/方法没有任何自然的反向限制(取代)它自己从外部调用方调用的频率,并且健壮的设计根本不能仅仅依赖于乐观的假设,即此类调用不会比XYZ-k TPS更频繁地出现(上面引用的20k可能适用于体外测试, 但真正的部署可能会改变几个人工顺序(无论是人为的 - 在测试期间,还是不 - 在某些高峰时段或用户(系统)恐慌期间,或者由于某些技术错误或硬件故障(我们都听说过很多次关于 NIC 卡淹没 L1/L2 流量超出所有可以想象的限制等 - 我们只是不知道也不能知道, 下次何时/何地再次发生)。
避免阻塞风险
前面提到的REQ/REP
可扩展正式通信模式以其陷入外部无法解决的分布式内部死锁的风险而闻名。这始终是一个需要避免的风险。缓解策略可能取决于实际用例的风险价值(需要认证医疗器械、金融科技用例、控制回路用例、学术研究论文代码或私人爱好玩具)。
参考:
REQ/REP
死锁>>> https://stackoverflow.com/a/38163015/3666197
Fig.1:
为什么在所有情况下都使用幼稚REQ/REP
是错误的[App1]
in_WaitToRecvSTATE_W2R
+[App2]
in_WaitToRecvSTATE_W2R
主要是不可挽救的分布式REQ-FSA/REP-FSA
相互死锁(两个有限状态自动机中的每一个都等待"另一个"移动)并且永远不会到达"下一个"in_WaitToSendSTATE_W2S
内部状态。
XTRN_RISK_OF_FSA_DEADLOCKED ~ { NETWORK_LoS
: || NETWORK_LoM
: || SIG_KILL( App2 )
: || ...
: }
:
[App1] ![ZeroMQ] : [ZeroMQ] ![App2]
code-control! code-control : [code-control ! code-control
+===========!=======================+ : +=====================!===========+
| ! ZMQ | : | ZMQ ! |
| ! REQ-FSA | : | REP-FSA! |
| !+------+BUF> .connect()| v |.bind() +BUF>------+! |
| !|W2S |___|>tcp:>---------[*]-----(tcp:)--|___|W2R |! |
| .send()>-o--->|___| | | |___|-o---->.recv() |
| ___/ !| ^ | |___| | | |___| ^ | |! ___ |
| REQ !| | v |___| | | |___| | v |! REP |
| ___.recv()<----o-|___| | | |___|<---o-<.send()___/ |
| !| W2R|___| | | |___| W2S|! |
| !+------<BUF+ | | <BUF+------+! |
| ! | | ! |
| ! ZMQ | | ZMQ ! |
| ! REQ-FSA | | REP-FSA ! |
~~~~~~~~~~~~~ DEADLOCKED in W2R ~~~~~~~~ * ~~~~~~ DEADLOCKED in W2R ~~~~~~~~~~~~~
| ! ///////////| |///////////! |
| ! ///////////| |//////////! |
+===========!=======================+ +=====================!===========+
这是我的(当前)解决方案,在 C++11 中,您可以将对象分配给thread_local
存储。将socket_t
实例static
和thread_local
存储在函数中为我提供了我一直在寻找的功能:
class socketPool
{
std::string endpoint_;
public:
socketPool(const std::string &ep) : endpoint_(ep) {}
zmq::socket_t & operator()()
{
thread_local static zmq::socket_t socket(
globalContext(),
ZMQ_REQ);
thread_local static bool connected;
if (!connected) {
connected = true;
socket.connect(endpoint_);
}
return socket;
}
};
// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");
在我的sendMessage()
函数中,我只是简单地创建和连接
bool sendMessage(std::string s)
{
zmq::socket_t &socket = httpReqPool();
// the rest as above
}
关于性能,嗯,它在我的机器上快了 7 倍。(每秒 140kREQ/REP
)。
我认为一个不同的是性能。
使用上面的代码,这意味着您需要执行 20k 次创建套接字、建立连接、发送消息和关闭套接字,从我的角度来看,这很耗时,您可以运行一些性能工具分析来检查函数sendMessage()
中使用了多少时间。
另一种方法是为每个线程创建一个请求套接字,并使用它所属的线程的套接字发送数据。ZeroMQ不支持多线程,否则会导致错误,例如断言错误(调试模式)或崩溃。
另一种方法是有一个专用线程用于与某个 FIFO 队列的 ZeroMQ 通信(当然,必须使用互斥锁或类似来保护......只要队列为空,此专用线程就应该处于休眠状态,并在此状态更改时唤醒(发出适当的信号)。
根据一般需要,每当收到对某些传出消息的响应时,专用线程可以简单地调用一些回调(在每个线程的某个专用对象处);请注意,您有不同的线程上下文,因此您可能需要一些同步方法来防止争用条件。
或者,发送线程可以只等待响应,在收到的响应上由 ZeroMQ 线程发出信号(嗯,这实际上是防止竞争条件的方法之一......
- 一个非常简单的win32套接字代码,但工作错误
- 在线程C++中创建一个套接字
- UDP 套接字读取最后一个传入字节
- 通过 c++ 中的 udp 套接字将派生类对象从一个进程发送到另一个进程
- 我正在编写一个简单的客户端套接字应用程序,但在连接后服务器收到一个空缓冲区
- 当我使用套接字代码发送第二条消息时,我有一个"sendto() Invalid argument"
- 是否可以寻址另一个网络中的服务器/客户端套接字?(C++)
- 我可以将一个套接字添加到多个 epoll 实例吗?
- 每个线程或每个调用一个 ZeroMQ 套接字
- QSocketNotifier:不能从另一个线程启用或禁用套接字通知程序
- 我正在尝试用 DevC++ 编译一个套接字程序,但每次我这样做时,我都会收到很多链接器错误,如下所示:
- 处理一个套接字和多个线程时异步 IO 的用例
- 服务器多线程无法保存最后一个套接字描述符
- 异步等待,直到在Asio中有一个套接字可用于读/写
- 同时等待一个条件(pthread_cond_wait)和一个套接字更改(select)
- 如何在一个套接字上实现并行请求和响应的非阻塞客户端-服务器通信模型,而不存在数据竞争
- 如何从一个套接字读取' 1 '字节,而从另一个套接字读取' read_some '字节
- 在android代码中创建一个套接字(而不是在android应用程序中)获得Permission Denied
- 通过ZeroMQ以字符串形式接收对象,然后通过另一个套接字以零拷贝发送对象的正确方法是什么
- 线程中一个套接字发送/接收