使用zmq进行多线程发布

Multi-threaded publish with zmq

本文关键字:多线程 zmq 使用      更新时间:2023-10-16

我有一个问题,关于在ZMQ中从多个线程发布到单个订阅者的最佳方式是。我有一台用C++编写的web服务器,对于中的每个连接,都需要向订阅者发送ZMQ消息。这些服务器可以有成百上千的并发连接,所以我很好奇向这个订阅者写信最有效的方法是什么。我可以想出三种不同的方法。

共享连接

我不相信这种方式会起作用,但基本上只有一个连接指针,并将其传递给所有线程。我假设(在ZMQ站点上搜索时找不到太多信息),这将导致竞争条件,因为我认为ZMQ_msg_send()不是线程安全的。我可以在发送时设置一个互斥锁来解决这个问题,但我担心速度,因为我的服务器需要尽可能快。

共享队列

这类似于共享连接,但方式不同,我没有在zmq_msg_send()周围放置互斥体,而是在要发送的消息的共享向量周围放置了一个互斥体,并有一个写线程来处理所有这些消息。我相信这将比以前的方法更快,因为写入向量可能比执行zmq_msg_send()快得多,但是,如果可能的话,我希望避免等待。

每个线程的连接

我能想到的避免互斥等待的唯一方法是为每个线程打开一个zmq连接(这意味着我进入的每个连接,作为一个进程,每个线程一个用户连接)。这也许是可行的,尽管我不知道zmq_connect是如何工作的。在建立连接之前,它会阻塞吗?理想情况下,我的流程应该是这样的:

user_connection()
{
    createZMQConnection();
    doWork();
    sendData();        
}

但是,如果创建连接块,则最好使用共享队列。

有人提出过类似的申请吗?或者有人知道推荐的模式是什么吗?如有任何见解,我们将不胜感激。

编辑:

谢谢你的回复,我只是在更多地研究zmq文档,发现了你所说的很多内容,但也许你可以澄清一点。

至于我的基础设施,我有一个多酒吧到一个潜艇。酒吧是连接的,潜艇是绑定的。

我有多个负载平衡的盒子,每个盒子运行多个线程。总的来说,我每秒看到大约30000条消息。每个盒子,我可能每秒看到大约2000-4000条消息。我通常每个请求的处理时间大约是40ms,所以我通常会打开大约100-200个并发实例。我不确定打开200个并发套接字对象是否会有问题,或者这是否是ZMQ的工作方式。从听起来,我应该向每个线程传递一个zmq::上下文,并在那里创建套接字。

这就是我想象的代码将如何流动,请让我知道这是否看起来正确

void receiveConnection()
{
    zmq::context_t context(1);
    doWorkClass c(context);
    c.run();
}
doWorkClass(context)
{
    socket(context, ZMQ_PUB);
}
void doWorkClass::run()
{
    sendString = doWork();
    s_send(socket,sendString);
}

因此,我为所有套接字使用一个上下文,并为每个线程创建一个套接字,完成我的工作并发送我的消息。

ZeroMQ的思维方式有点不同

从你的想法中可以看出,你试图优化资源,并在访问这些资源时尽量避免种族条件。

ZeroMQ看起来确实"相似",但工作方式有点不同。忘记指针,忘记阻塞和类似的问题。

ZeroMQ是一个相当抽象的依赖层

这意味着,您可以将多个线程创建为PUB侧,并具有一个SUB(或多个,采用负载平衡设计),该线程将消耗所有传入的事件流。

有了给定的信息集,似乎需要一个设置,其中:

  • 一个中心进程(如果是分布式的,则具有已知的ip:port地址)创建一个SUB原型,.bind()到该"中心点",并将其自己的订阅(以避免任何筛选)设置为""
  • 所有特别创建的线程都将其自己的PUB原型和.connect()实例化为"中心节点"
  • 可能希望添加任何额外的信令/状态控制/自有协议握手消息原型,并行操作,以满足您的系统范围要求

零共享原则

ZeroMQ不鼓励将访问点共享到ZeroMQ套接字,这不是线程安全的方法,原则上应避免。

零阻塞原理

ZeroMQ还不鼓励使用所有"低级别"和系统信令/阻塞方式。

如果需要,可以为线程到线程软SIG信令&国家控制。

您的所有消息流都可以设计为无阻塞的(事实上,这可以极大地缓解性能)。正如您所指出的,您的服务器需要速度。

你的MSG/msec&MB/毫秒是否有点定量?

您可能会在StackOverflow和ZeroMQWeb上找到ZeroMQ层的合理性能测试,这两个测试都在这里引用,以便进行比较。

实现目标的最佳(我认为是唯一或最正确的)方法是使用每线程连接方法。

您需要在线程体中创建新的套接字(作为客户端),建立连接,通过实现简单的握手过程确保您的连接,并且仅在开始推送数据之后。

这里的握手非常重要,因为zmq_connect不能保证套接字在呼叫结束后立即准备好传输。

我通常使用ZMQ_PAIR套接字(当应用程序希望在单个主机上运行时),父级和子级之间共享URL,或者使用分布式env的REQ/REP。