将ZeroMQ与Boost::ASIO一起使用

Using ZeroMQ together with Boost::ASIO

本文关键字:一起 ASIO Boost ZeroMQ      更新时间:2023-10-16

我有一个C++应用程序,它正在使用ZeroMQ进行一些消息传递。但它也必须为基于AJAX/Commet的web服务提供SGCI连接。

为此,我需要一个普通的TCP套接字。我可以通过普通的Posix套接字做到这一点,但为了保持跨平台的可移植性,让我的生活更轻松(我希望…),我考虑使用Boost::ASIO。

但现在我遇到了ZMQ想要使用它自己的zmq_poll()和ASIO的io_service.run()的冲突。。。

有没有办法让ASIO与0MQzmq_poll()合作?

或者有其他推荐的方法来实现这样的设置吗?

注意:我可以通过使用多个线程来解决这个问题,但只有一个小的单核/CPU框可以用非常低的SCGI流量运行该程序,所以多线程会浪费资源。。。

在阅读了这里和这里的文档后,特别是这段

ZMQ_FD:检索与套接字关联的文件描述符ZMQ_FD选项应检索与指定的套接字。返回的文件描述符可用于将套接字集成到现有事件循环中;websphere MQ库应在边缘触发的套接字上发出任何未决事件的信号通过使文件描述符准备好进行读取。

我认为您可以对每个zmq_pollitem_t使用null_buffers,并将事件循环推迟到io_service,完全绕过zmq_poll()。然而,在上述文件中似乎有一些注意事项,特别是

从返回的文件描述符中读取的能力必须指示可以从中读取消息,或者可以写入底层套接字;应用程序必须检索实际事件状态以及随后的ZMQ_EVENTS检索选项

因此,当您的一个zmq套接字的处理程序被触发时,我认为在处理该事件之前,您必须做更多的工作。未编译的伪代码低于

const int fd = getZmqDescriptorSomehow();
boost::asio::posix::stream_descriptor socket( _io_service, fd );
socket->async_read_some(
boost::asio::null_buffers(),
[=](const boost::system::error_code& error)
{
if (!error) {
// handle data ready to be read
}
}
);

注意,这里不必使用lambda,boost::bind对成员函数就足够了。

最后我发现有两种可能的解决方案:

  • 我们使用ASIO事件循环的Sam Miller
  • ZeroMQ的事件循环,通过acceptorsocket.native()方法获取ASIO文件描述符,并将它们插入zmq_pollitem_t的数组

我接受了Sam Miller的回答,因为对我来说,这是SCGI情况下不断创建和结束新连接的最佳解决方案。处理如此每次更改的zmq_pollitem_t阵列是一件非常麻烦的事情,可以通过使用ASIO事件循环来避免。

获得ZeroMQ的套接字是战斗中最小的部分。ZeroMQ基于一个在TCP上分层的协议,因此如果采用此路由,则必须在自定义Boost.Asio io_service中重新实现ZeroMQ。我在使用Boost.Asio创建异步ENet服务时遇到了同样的问题,因为我首先只是试图使用Boost.Asio UDP服务捕获来自ENet客户端的流量。ENet是一个在UDP上分层的类似TCP的协议,所以我当时所实现的只是在几乎无用的状态下捕获数据包。

Asio是基于模板的,内置的io_service使用模板基本上封装了系统套接字库来创建TCP和UDP服务。我的最终解决方案是创建一个自定义io_service,它封装了ENet库而不是系统套接字库,允许它使用ENet的传输函数,而不必使用内置的UDP传输重新实现它们。

ZeroMQ也可以这样做,但ZeroMQ本身已经是一个非常高性能的网络库,它已经提供了异步I/O。我认为您可以通过使用ZeroMQ现有的API接收消息并将消息传递到io_service线程池来创建一个可行的解决方案。这样,消息/任务仍然可以使用Boost.Asio的reactor模式异步处理,而无需重写任何内容。ZeroMQ将提供异步I/O,Boost。Asio将提供异步任务处理程序/工作程序。

现有的io_service仍然可以耦合到现有的TCP套接字,允许线程池同时处理TCP(在您的情况下是HTTP)和ZeroMQ。在这样的设置中,ZeroMQ任务处理程序完全可以访问TCP服务会话对象,从而允许您将ZeroMQ消息/任务的结果发送回TCP客户端。

以下只是为了说明这个概念。

// Create a pool of threads to run all of the io_services.
std::vector<boost::shared_ptr<boost::thread> > threads;
for(std::size_t i = 0; i < thread_pool_size_; ++i) {
boost::shared_ptr<boost::thread> thread(new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service_)));
threads.push_back(thread);
}
while (1) {
char buffer [10];
zmq_recv (responder_, buffer, 10, 0);
io_service_.post(boost::bind(&server::handle_zeromq_message, buffer, this));
}

在这个问题出现两年后,有人发布了一个正是这样做的项目。项目在这里:https://github.com/zeromq/azmq.讨论该设计的博客文章如下:https://rodgert.github.io/2014/12/24/boost-asio-and-zeromq-pt1/.

以下是从自述文件中复制的示例代码:

#include <azmq/socket.hpp>
#include <boost/asio.hpp>
#include <array>
namespace asio = boost::asio;
int main(int argc, char** argv) {
asio::io_service ios;
azmq::sub_socket subscriber(ios);
subscriber.connect("tcp://192.168.55.112:5556");
subscriber.connect("tcp://192.168.55.201:7721");
subscriber.set_option(azmq::socket::subscribe("NASDAQ"));
azmq::pub_socket publisher(ios);
publisher.bind("ipc://nasdaq-feed");
std::array<char, 256> buf;
for (;;) {
auto size = subscriber.receive(asio::buffer(buf));
publisher.send(asio::buffer(buf));
}
return 0;
}

看起来不错。如果你尝试过,请在评论中告诉我它在2019年是否仍然有效[我可能会在几个月后尝试,然后更新这个答案](回购已经过时,上次提交是一年前)

解决方案是轮询io_service,而不是run()。

查看此解决方案以获取一些poll()信息。

使用poll而不是run将允许您在没有任何阻塞问题的情况下轮询zmq的连接。