ZMQ 导致主线程冻结(或类似的东西?

ZMQ causes main thread to freeze (or something similar..?)

本文关键字:线程 冻结 ZMQ      更新时间:2023-10-16

我有以下一段 c++ 代码,它打开了一个 ZMQ 订阅者套接字并在无限循环中接收消息。

listener.cc: (代码应该可以工作,编译方式是:g++ -lzmq listener.cc)

#include <iostream>
#include <zmq.hpp>
class Listener {
public:
Listener() {
std::cout << "constructor call" << std::endl;
// Not working:
//        zmq::context_t context(1);
//        sck = new zmq::socket_t(context, ZMQ_SUB);
//        sck->connect("tcp://127.0.0.1:9999");
//        sck->setsockopt( ZMQ_SUBSCRIBE, "", 0);
std::cout << "constructor end" << std::endl;
}
void run() {
// Ok:
zmq::context_t context(1);
sck = new zmq::socket_t(context, ZMQ_SUB);
sck->connect("tcp://127.0.0.1:9999");
sck->setsockopt(ZMQ_SUBSCRIBE, "", 0);
while (1) { // Receive messages, not interesting:
std::cout << "listening..." << std::endl;
zmq::message_t message;
sck->recv(&message);
std::cout << "received something" << std::endl;
}
}
zmq::socket_t *sck;
};
int main(int argc, char *argv[]) {
Listener listener;
std::cout << "foo" << std::endl;
listener.run();
return 0;
}

到目前为止,代码按预期工作:

$ g++ -lzmq listener.cc
$ ./a.out 
constructor call
constructor end
foo
listening...

但是,我想将 zmq-context/socket 的初始化移动到类的构造函数(被注释掉的部分)中。但是代码根本不会从构造函数调用返回,构造函数中的所有语句都被执行,但main的第二行没有执行,程序卡住了。输出为:

$ g++ -lzmq listener.cc
$ ./a.out 
constructor call
constructor end

我唯一想到的是主线程由于某种原因停止执行。任何人都可以解释这一点并提供解决方案吗?

干杯

任何人都可以解释这一点并提供解决方案吗?对两者都是的...

ZeroMQ将每个Context( nIOthreads = 1 )实例用作引擎盖下非常强大的引擎,必须小心谨慎,以免资源管理程序感到惊讶(因为阻塞/冻结就是这样一种情况)。

如果有一些活跃使用的套接字实例(在 Context() 实例的后台管理),则可能存在一种情况,即在进入析构函数处理阶段之前,并非所有传输都已完成,或者如果手动执行类似的步骤,以尝试.close()此类套接字实例和/或.term()上下文实例。

人们,有意或无意,多次撞到这里。

ZeroMQ原生API文档在这个问题上非常清楚,并警告了一个风险,即尚未完成的低级事务可能会让代码无限期地等待外部(远程代理操作)事件,该事件永远不会出现。这种无意识的代码看起来像是冻结/挂起的故障,但由于一个人没有意识到这种风险并且没有采取适当的预防措施,就被驱赶到这种确定性的情况下。

虽然较新的 API 版本更改了一些默认设置,但我建议所有用户显式设置安全配置,即使较新的默认值可以避免手动执行此操作。然而,这种做法有助于提高人们对在适当的分布式系统设计实践中应该考虑什么样的碰撞的认识。


溶液?始终.setsockopt( ZMQ_LINGER, 0 );

zmq_term()应阻止,直到满足以下条件:

上下文中打开的所有套接字都已关闭,zmq_close().
对于上下文中的每个套接字,应用程序发送的所有带有zmq_send()的消息都已物理传输到网络对等方,或者使用ZMQ_LINGER套接字选项设置的套接字的延迟期已过期。

如上所述,这是每个套接字实例化的经验法则。

class Listener {
// zmq::context_t aClassLocalCONTEXT;              // MAY GET SET LOCAL CTX BY VALUE
// zmq::socket_t  aClassLocalSOCKET;               // MAY GET SET LOCAL SCK BY VALUE EITHER
zmq::socket_t  *sck;
public:
Listener() {
std::cout << "constructor call" << std::endl;
// zmq::context_t context(1);                    // not a best practice here
// ---------------------------------------------------
// sck = new zmq::socket_t( aClassLocalCONTEXT, ZMQ_SUB );
sck = new zmq::socket_t( context, ZMQ_SUB );
sck->setsockopt( ZMQ_LINGER, 0 );             // ALWAYS, best before .bind()/.connect()
sck->connect(   "tcp://127.0.0.1:9999" );
sck->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
// ----------------------------------------------// IF SETUP BY AN INSTANTIATION CALL INTERFACE
// aClassLocalSOCKET->setsockopt( ZMQ_LINGER, 0 );
// aClassLocalSOCKET->connect(    ... );
// aClassLocalSOCKET->setsockopt( ZMQ_SUBSCRIBE, "", 0 );
// ---------------------------------------------------
std::cout << "constructor end" << std::endl;
}
~Listener() {
sck->close();                                 // A GOOD PRACTICE
// ----------------------------------------------// IF SETUP BY AN INSTANTIATION CALL INTERFACE
// aClassLocalSOCKET->close();
}
void run() {            
while (1) {                                   // recv()-messages, not interesting:
std::cout << "listening..." << std::endl;
zmq::message_t message;
sck->recv(&message);
std::cout << "received something" << std::endl;
zmq::zmq_msg_close(&message);             // A GOOD PLACE TO DISCARD A NEVER MORE USED RESOURCE
}
}
};

int main(int argc, char *argv[]) {
zmq::context_t context(1);                        // GLOBAL CTX
Listener listener;
std::cout << "foo" << std::endl;
listener.run();
return 0;
}

高效的资源可降低间接成本

资源的智能处理很重要,因为每个实例化和销毁都承担[TIME]域和[SPACE]域的成本(内存分配/取消分配成本,再次及时),而这些都不便宜。

另外,一个人应该遵循零的ZeroMQ禅宗 - 不要分享任何东西(好吧,有时分享一个Context()实例是一种方式,但是......如果你认真对待分布式系统设计,最好读一读Pieter HINTJENS的书"Code Connected: Volume 1",绝对值得花时间和精力)。

从构造函数中取出zmq::context_t context(1);

这应该全局初始化或类似的地方。 通常,您只需要其中之一。

您正在冻结,因为您正在尝试删除构造函数的本地zmq::context_t,而使用它的套接字仍然存在。