使用 zmq::p roxy 和 REQ/REP 模式
Using zmq::proxy with REQ/REP pattern
我试图了解 zmq::p roxy 的工作原理,但我遇到了问题:我想将消息路由到正确的工作线程,但似乎身份和 evelopes 被忽略了:在示例中,我想将消息从客户端 1 路由到 worker2,将消息从客户端 2 路由到 worker1,但似乎消息是在基于"第一个可用工作线程"的规则上提供的。 是我做错了什么,还是我误解了身份的运作方式?
#include <atomic>
#include <cassert>
#include <chrono>
#include <iostream>
#include <thread>
#include <mutex>
#include <zmq.hpp>
#include <zmq_addon.hpp>
using namespace zmq;
std::atomic_bool running;
context_t context(4);
std::mutex mtx;
void client_func(std::string name, std::string target, std::string message)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
socket_t request_socket(context, socket_type::req);
request_socket.connect("inproc://router");
request_socket.setsockopt( ZMQ_IDENTITY, name.c_str(), name.size());
while(running)
{
multipart_t msg;
msg.addstr(target);
msg.addstr("");
msg.addstr(message);
std::cout << name << "sent a message: " << message << std::endl;
msg.send(request_socket);
multipart_t reply;
if(reply.recv(request_socket))
{
std::unique_lock<std::mutex>(mtx);
std::cout << name << " received a reply!" << std::endl;
for(size_t i = 0 ; i < reply.size() ; i++)
{
std::string theData(static_cast<char*>(reply[i].data()),reply[i].size());
std::cout << "Part " << i << ": " << theData << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
request_socket.close();
}
void worker_func(std::string name, std::string answer)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
socket_t response_socket(context, socket_type::rep);
response_socket.connect("inproc://dealer");
response_socket.setsockopt( ZMQ_IDENTITY, "resp", 4);
while(running)
{
multipart_t request;
if(request.recv(response_socket))
{
std::unique_lock<std::mutex>(mtx);
std::cout << name << " received a request:" << std::endl;
for(size_t i = 0 ; i < request.size() ; i++)
{
std::string theData(static_cast<char*>(request[i].data()),request[i].size());
std::cout << "Part " << i << ": " << theData << std::endl;
}
std::string questioner(static_cast<char*>(request[0].data()),request[0].size());
multipart_t msg;
msg.addstr(questioner);
msg.addstr("");
msg.addstr(answer);
msg.send(response_socket);
}
}
response_socket.close();
}
int main(int argc, char* argv[])
{
running = true;
zmq::socket_t dealer(context, zmq::socket_type::dealer);
zmq::socket_t router(context, zmq::socket_type::router);
dealer.bind("inproc://dealer");
router.bind("inproc://router");
std::thread client1(client_func, "Client1", "Worker2", "Ciao");
std::thread client2(client_func, "Client2", "Worker1", "Hello");
std::thread worker1(worker_func, "Worker1","World");
std::thread worker2(worker_func, "Worker2","Mondo");
zmq::proxy(dealer,router);
dealer.close();
router.close();
if(client1.joinable())
client1.join();
if(client2.joinable())
client2.join();
if(worker1.joinable())
worker1.join();
if(worker2.joinable())
worker2.join();
return 0;
}
从文档中:
当前端是ZMQ_ROUTER套接字,后端是ZMQ_DEALER套接字时,代理应充当共享队列,从一组客户端收集请求,并在一组服务之间公平分配这些请求。请求应从前端连接公平排队,并在后端连接之间均匀分布。答复应自动返回给提出原始请求的客户。
代理处理多个客户端,并使用多个工作线程来处理请求。标识用于将响应发送到正确的客户端。不能使用标识"选择"特定工作人员。
相关文章:
- 将 boost 序列化对象的 asio::streambuf 表示转换为 Beast 的 DynamicBody req.body()
- "Body requirements not met"将 req 传递给 c++ 中的方法(Boost Beast 库)
- 防止gcc破坏我的AVX2内部复制到REP MOVS
- 当前C++编译器是否发出过"rep movsb/w/d"?
- 尝试转换 std::chrono::d uration 会导致"rep cannot be a duration"编译错误
- 使用 zmq::p roxy 和 REQ/REP 模式
- 为什么 zmq REQ-REP 不起作用?
- EAGAIN 在 zeromq REQ/REP 上收到,没有阻塞套接字
- 为什么ZeroMQ req rep在C++和Python之间给我一个空响应
- ZeroMQ REQ/REP如何处理多个客户端
- 在 32 位和 64 位程序中使用 std::chrono::d uration::rep 和 printf
- C++ 具有 REQ 和 REP 套接字的 ZeroMQ 单一应用程序
- 来自 Visual Studio 的此代码中的 rep stos 程序集命令的目的
- zeromq:重置REQ/REP套接字状态
- 带有大型消息的ZeroMQ:REQ/REP
- ZeroMQ-使用REQ套接字发送超过30个字节
- 将minutes::rep转换为hours::rep
- c++ req-rep终止错误
- 将ZeroMQ REQ/REP与C++11期货混合