使用zmq进行多线程发布
Multi-threaded publish with zmq
我有一个问题,关于在ZMQ中从多个线程发布到单个订阅者的最佳方式是。我有一台用C++编写的web服务器,对于中的每个连接,都需要向订阅者发送ZMQ消息。这些服务器可以有成百上千的并发连接,所以我很好奇向这个订阅者写信最有效的方法是什么。我可以想出三种不同的方法。
共享连接
我不相信这种方式会起作用,但基本上只有一个连接指针,并将其传递给所有线程。我假设(在ZMQ站点上搜索时找不到太多信息),这将导致竞争条件,因为我认为ZMQ_msg_send()不是线程安全的。我可以在发送时设置一个互斥锁来解决这个问题,但我担心速度,因为我的服务器需要尽可能快。
共享队列
这类似于共享连接,但方式不同,我没有在zmq_msg_send()周围放置互斥体,而是在要发送的消息的共享向量周围放置了一个互斥体,并有一个写线程来处理所有这些消息。我相信这将比以前的方法更快,因为写入向量可能比执行zmq_msg_send()快得多,但是,如果可能的话,我希望避免等待。
每个线程的连接
我能想到的避免互斥等待的唯一方法是为每个线程打开一个zmq连接(这意味着我进入的每个连接,作为一个进程,每个线程一个用户连接)。这也许是可行的,尽管我不知道zmq_connect是如何工作的。在建立连接之前,它会阻塞吗?理想情况下,我的流程应该是这样的:
user_connection()
{
createZMQConnection();
doWork();
sendData();
}
但是,如果创建连接块,则最好使用共享队列。
有人提出过类似的申请吗?或者有人知道推荐的模式是什么吗?如有任何见解,我们将不胜感激。
编辑:
谢谢你的回复,我只是在更多地研究zmq文档,发现了你所说的很多内容,但也许你可以澄清一点。
至于我的基础设施,我有一个多酒吧到一个潜艇。酒吧是连接的,潜艇是绑定的。
我有多个负载平衡的盒子,每个盒子运行多个线程。总的来说,我每秒看到大约30000条消息。每个盒子,我可能每秒看到大约2000-4000条消息。我通常每个请求的处理时间大约是40ms,所以我通常会打开大约100-200个并发实例。我不确定打开200个并发套接字对象是否会有问题,或者这是否是ZMQ的工作方式。从听起来,我应该向每个线程传递一个zmq::上下文,并在那里创建套接字。
这就是我想象的代码将如何流动,请让我知道这是否看起来正确
void receiveConnection()
{
zmq::context_t context(1);
doWorkClass c(context);
c.run();
}
doWorkClass(context)
{
socket(context, ZMQ_PUB);
}
void doWorkClass::run()
{
sendString = doWork();
s_send(socket,sendString);
}
因此,我为所有套接字使用一个上下文,并为每个线程创建一个套接字,完成我的工作并发送我的消息。
ZeroMQ的思维方式有点不同
从你的想法中可以看出,你试图优化资源,并在访问这些资源时尽量避免种族条件。
ZeroMQ看起来确实"相似",但工作方式有点不同。忘记指针,忘记阻塞和类似的问题。
ZeroMQ是一个相当抽象的依赖层
这意味着,您可以将多个线程创建为PUB
侧,并具有一个SUB
(或多个,采用负载平衡设计),该线程将消耗所有传入的事件流。
有了给定的信息集,似乎需要一个设置,其中:
- 一个中心进程(如果是分布式的,则具有已知的ip:port地址)创建一个
SUB
原型,.bind()
到该"中心点",并将其自己的订阅(以避免任何筛选)设置为""
- 所有特别创建的线程都将其自己的
PUB
原型和.connect()
实例化为"中心节点" - 可能希望添加任何额外的信令/状态控制/自有协议握手消息原型,并行操作,以满足您的系统范围要求
零共享原则
ZeroMQ不鼓励将访问点共享到ZeroMQ套接字,这不是线程安全的方法,原则上应避免。
零阻塞原理
ZeroMQ还不鼓励使用所有"低级别"和系统信令/阻塞方式。
如果需要,可以为线程到线程软SIG信令&国家控制。
您的所有消息流都可以设计为无阻塞的(事实上,这可以极大地缓解性能)。正如您所指出的,您的服务器需要速度。
你的MSG/msec&MB/毫秒是否有点定量?
您可能会在StackOverflow和ZeroMQWeb上找到ZeroMQ层的合理性能测试,这两个测试都在这里引用,以便进行比较。
实现目标的最佳(我认为是唯一或最正确的)方法是使用每线程连接方法。
您需要在线程体中创建新的套接字(作为客户端),建立连接,通过实现简单的握手过程确保您的连接,并且仅在开始推送数据之后。
这里的握手非常重要,因为zmq_connect不能保证套接字在呼叫结束后立即准备好传输。
我通常使用ZMQ_PAIR套接字(当应用程序希望在单个主机上运行时),父级和子级之间共享URL,或者使用分布式env的REQ/REP。
- 在C++中使用cURL和多线程
- 多线程双缓冲区
- 为什么我的多线程作业队列崩溃
- 在main()之外初始化std::vector会导致性能下降(多线程)
- 试图创建一个多线程程序来查找0-100000000之间的总素数
- 为什么一个向量上的多线程操作很慢
- 学习多线程C++:添加线程不会使执行速度更快,即使它看起来应该
- 全局变量 多读取器 一个写入器多线程安全?
- boost::文件系统::recursive_directory_iterator多线程安全
- 如何阻止TensorFlow的多线程
- 如何在多线程中正确使用unique_ptr进行多态性?
- 并发/多线程:是否可以以这种方式生成相同的输出?
- sigwait() 在多线程程序中不起作用
- 多线程程序中出现意外的内存泄漏
- 静态 constexpr 类成员变量对多线程读取是否安全?
- 多线程比没有线程C++慢
- 具有 C++11 多线程的特征库
- 如何在C++和ZMQ中正确使用多线程
- 以某种方式在ZMQ中同时使用Send/Recv是可能的(通过多线程)
- 使用zmq进行多线程发布