每个线程或每个调用一个 ZeroMQ 套接字

One ZeroMQ socket per thread or per call?

本文关键字:一个 套接字 ZeroMQ 线程 调用      更新时间:2023-10-16

众所周知,ZeroMQ 套接字不能在应用程序线程之间共享
但是context_t实例可以。

我有一个多线程应用程序,我希望每个线程不时与REQ/REP套接字对手(事件、异常等)交换消息,具体取决于它们在做什么(它们正在做非 ZeroMQ 的东西)。

要将消息发送到我的REQ/REP-socket,我使用以下函数
(半C++半伪代码):

sendMessage:

bool sendMessage(std::string s)
{
zmq::socket_t socket(globalContext(), ZMQ_REQ);
socket.connect("ipc://http-concentrator");
zmq::message_t message(s.size());
memcpy(message.data(), s.data(), s.size());
if (!socket.send(message))
return false;
// poll on socket for POLLIN with timeout
socket.recv(&message);
// do something with message
return true;
}

在需要时从每个线程调用此函数。它创建本地套接字、连接、发送消息并接收响应。退出时,插座断开连接并移除(至少我假设它已关闭)。

这样,我就不需要费心在每个线程中维护一个套接字。这是以每次调用此函数时创建和连接为代价的。

我已经强调了这段代码,我没有看到重用一个套接字和这个重新连接实现之间的太大区别。(我每秒有 20k 个REP/REQ事务,包括用例两侧的 JSON 解码/编码)

问:有没有更正确的 ZeroMQ 方法?

Nota Bene:此答案是在 O/P 在传输类上从 20k TPS 更改为 140k TPS 之前发布的 ipc://

:有没有正确的 ZeroMQ 方法?

A:不容易说什么是"这个",什么是"正确性"指标的参数

鉴于此,以下几点将更加通用

适用于系统设计阶段推理:


资源利用间接费用避免

这一点是一把双刃剑。一些开销总是与REQ-接入点到REQ/REP模式的基础设施元素设置和处置(是的,甚至是关闭和拆除)相关联的,相关的基于套接字的传输类在REQ端主机和REP端都带来了一些显着的开销。

你注意到的是公平的,你对在大约20k TPS的水平上对此进行了定量测试,并且没有观察到这种方法的任何不利影响。目前尚不清楚的是,是否还有其他场景也在相同的SUT(被测系统)上进行了体内测试,以便为比较每个各自的设计提供一些基线(并允许确定开销本身的差异)。

虽然一个设计良好的框架将这部分系统内部行为隐藏在用户维护的代码中,但这并不意味着,它都是廉价的,而不是免费的处理。

很明显,在Context()实例线程(...是的,复数在这里是正确的,因为一些高性能代码可能会受益于每个Context()实例使用多个 I/O 线程,并通过显式定义模式套接字与其各自的 I/O 线程处理程序之间的亲和力映射来积极影响工作负载分布(以便以某种方式平衡(如果无法确定地平衡)预期的 I/O 吞吐量, 包括所有相关开销)。

如果仍然有疑问,人们应该永远记住,命令式编程风格的函数或面向对象的方法主要是外部调用者的受害者,外部调用者决定在哪个时刻以及何时调用这种">从属"代码执行单元值班并被执行。该函数/方法没有任何自然的反向限制(取代)它自己从外部调用方调用的频率,并且健壮的设计根本不能仅仅依赖于乐观的假设,即此类调用不会比XYZ-k TPS更频繁地出现(上面引用的20k可能适用于体外测试, 但真正的部署可能会改变几个人工顺序(无论是人为的 - 在测试期间,还是不 - 在某些高峰时段或用户(系统)恐慌期间,或者由于某些技术错误或硬件故障(我们都听说过很多次关于 NIC 卡淹没 L1/L2 流量超出所有可以想象的限制等 - 我们只是不知道也不能知道, 下次何时/何地再次发生)。

避免阻塞风险

前面提到的REQ/REP可扩展正式通信模式以其陷入外部无法解决的分布式内部死锁的风险而闻名。这始终是一个需要避免的风险。缓解策略可能取决于实际用例的风险价值(需要认证医疗器械、金融科技用例、控制回路用例、学术研究论文代码或私人爱好玩具)。

参考:REQ/REP死锁>>> https://stackoverflow.com/a/38163015/3666197

Fig.1:为什么在所有情况下都使用幼稚REQ/REP
是错误的[App1]in_WaitToRecvSTATE_W2R+[App2]in_WaitToRecvSTATE_W2R
主要是不可挽救的分布式REQ-FSA/REP-FSA相互死锁(两个有限状态自动机中的每一个都等待"另一个"移动)并且永远不会到达"下一个"in_WaitToSendSTATE_W2S内部状态。

XTRN_RISK_OF_FSA_DEADLOCKED ~ {  NETWORK_LoS
:   || NETWORK_LoM
:   || SIG_KILL( App2 )
:   || ...
:      }
:
[App1]      ![ZeroMQ]                    :    [ZeroMQ]              ![App2] 
code-control! code-control               :    [code-control         ! code-control
+===========!=======================+    :    +=====================!===========+
|           ! ZMQ                   |    :    |              ZMQ    !           |
|           ! REQ-FSA               |    :    |              REP-FSA!           |
|           !+------+BUF> .connect()|    v    |.bind()  +BUF>------+!           |
|           !|W2S   |___|>tcp:>---------[*]-----(tcp:)--|___|W2R   |!           |
|     .send()>-o--->|___|           |         |         |___|-o---->.recv()     |
| ___/      !| ^  | |___|           |         |         |___| ^  | |!      ___ |
| REQ       !| |  v |___|           |         |         |___| |  v |!       REP |
| ___.recv()<----o-|___|           |         |         |___|<---o-<.send()___/ |
|           !|   W2R|___|           |         |         |___|   W2S|!           |
|           !+------<BUF+           |         |         <BUF+------+!           |
|           !                       |         |                     !           |
|           ! ZMQ                   |         |   ZMQ               !           |
|           ! REQ-FSA               |         |   REP-FSA           !           |
~~~~~~~~~~~~~ DEADLOCKED in W2R ~~~~~~~~ * ~~~~~~ DEADLOCKED in W2R ~~~~~~~~~~~~~
|           ! ///////////|         |///////////!           |
|           ! ///////////|         |//////////!           |
+===========!=======================+         +=====================!===========+

这是我的(当前)解决方案,在 C++11 中,您可以将对象分配给thread_local存储。将socket_t实例staticthread_local存储在函数中为我提供了我一直在寻找的功能:

class socketPool
{
std::string endpoint_;
public:
socketPool(const std::string &ep) : endpoint_(ep) {}
zmq::socket_t & operator()()
{
thread_local static zmq::socket_t socket(
globalContext(), 
ZMQ_REQ);
thread_local static bool connected;
if (!connected) {
connected = true;
socket.connect(endpoint_);
}
return socket;
}
};
// creating a pool for each endpoint
socketPool httpReqPool("ipc://http-concentrator");

在我的sendMessage()函数中,我只是简单地创建和连接

bool sendMessage(std::string s)
{
zmq::socket_t &socket = httpReqPool();
// the rest as above
}

关于性能,嗯,它在我的机器上快了 7 倍。(每秒 140kREQ/REP)。

我认为一个不同的是性能。

使用上面的代码,这意味着您需要执行 20k 次创建套接字、建立连接、发送消息和关闭套接字,从我的角度来看,这很耗时,您可以运行一些性能工具分析来检查函数sendMessage()中使用了多少时间。

另一种方法是为每个线程创建一个请求套接字,并使用它所属的线程的套接字发送数据。ZeroMQ不支持多线程,否则会导致错误,例如断言错误(调试模式)或崩溃。

另一种方法是有一个专用线程用于与某个 FIFO 队列的 ZeroMQ 通信(当然,必须使用互斥锁或类似来保护......只要队列为空,此专用线程就应该处于休眠状态,并在此状态更改时唤醒(发出适当的信号)。

根据一般需要,每当收到对某些传出消息的响应时,专用线程可以简单地调用一些回调(在每个线程的某个专用对象处);请注意,您有不同的线程上下文,因此您可能需要一些同步方法来防止争用条件。

或者,发送线程可以只等待响应,在收到的响应上由 ZeroMQ 线程发出信号(嗯,这实际上是防止竞争条件的方法之一......

相关文章: