以某种方式在ZMQ中同时使用Send/Recv是可能的(通过多线程)

Is is possible to somehow use Send/Recv in ZMQ simultaniousely(by multithreading)?

本文关键字:Recv 多线程 Send 方式 ZMQ      更新时间:2023-10-16

我正在尝试如何有效地将ZMQ用于多线程(这样发送就不会阻止接收,接收也不会阻止发送)。

我想使用ZMQ_DONTWAIT标志,但在发送数据时,它有时不会被发送(EAGAIN错误,所以我必须重新对消息进行排队,这在处理兆字节的数据时是浪费资源的)。

我确实想出了以下代码:

Concurrency::concurrent_queue<zmq::message_t> QUEUE_IN;
Concurrency::concurrent_queue<zmq::message_t> QUEUE_OUT;
void SendThread(zmq::context_t &context) {
    zmq::socket_t zmq_socket(context, ZMQ_DEALER);
    zmq_socket.connect(string_format("tcp://%s:%s", address, port).c_str());
    zmq::message_t reply;
    while (true) {
        while (QUEUE_OUT.try_pop(reply))
            zmq_socket.send(reply);
        Sleep(1);
    }
}
void RecvThread(zmq::context_t &context) {
    zmq::socket_t zmq_socket(context, ZMQ_DEALER);
    zmq_socket.connect(string_format("tcp://%s:%s", address, port).c_str());
    zmq::message_t reply;
    while (true) {
        while (zmq_socket.recv(&reply))
            QUEUE_IN.push(reply);
    }
}
void ConnectionThread()
{
    zmq::context_t context(1);
    std::thread* threads[2] = { 
        new std::thread(SendThread, context), 
        new std::thread(RecvThread, context)
    };
    threads[0]->join();
}

然而,这需要在服务器端上有两个套接字,并且我需要确定我需要向哪个套接字发送数据,以及我需要在服务器上侦听哪个套接字,对吧?是否没有办法在多线程环境中使用一个套接字同时使用发送和接收?

我可能想在一个套接字上异步地进行,但在研究了异步示例后,我仍然没有理解这个想法,因为周围没有太多评论。

避免睡眠

为了避免睡眠,可以使用zmq_POLLOUT事件使用zmq_poll()来保护send()。您不需要使用ZMQ_DONTWAIT。[我在那里使用了C函数,您的绑定将具有等效功能。]

路由到RecvThread

线程之间不能共享套接字,因此需要2个套接字才能正常工作。服务器只需要一个绑定到2个端口的套接字(可能是ROUTER)。当它收到消息时,它需要知道在哪里发送回复。。。

当ROUTER套接字接收到消息时,zmq内部会向消息中添加一个具有发送方标识的帧。服务器代码将看到该帧,服务器代码在构造消息以回复发件人时通常会使用相同的身份帧。在您的情况下,这是客户端的SendThread。OTOH,你想回复客户端的接收套接字,所以身份框架必须用于此。

剩下的就是服务器如何获得客户端接收套接字的标识帧。为此,您需要发明一个小型协议。安排客户端的RecvThread向服务器发送一条消息就足够了。服务器应该理解该消息,并简单地保留客户端接收套接字的标识帧,并在构建回复消息时使用其副本。

所有这些都在"探索路由器套接字"下的指南中进行了解释。

  1. 当发送大数据时(你说你在一条消息中发送MB的数据),这将需要一些时间,ZMQ不会"双工"发送和接收,这样它们就可以同时发生。DONTWAIT标志不会对您有太大帮助,它的目的是确保您在执行非ZMQ操作时不会等待ZMQ。在任何情况下,所有消息仍应排队(排除高水位线的干扰)
  2. 安全使用多个线程并行发送和接收的唯一方法是使用多个套接字

但是,这并不全是坏事。如果您使用一个指定的发送套接字和一个指定接收套接字,那么您可以使用pub/sub,这将打开一些有趣的选项。