ZeroMQ是否有数据到达时的通知/回调事件/消息
Does ZeroMQ have a notification/callback event/message for when data arrives?
我正在尝试将ZMQ集成到严重依赖MFC套接字(CASyncSocket)的现有Windows应用程序中。
我有一个 CWinThread 派生的 UI 线程(没有 GUI),它使用 CAsyncSocket 与服务器异步通信。 我想添加一个 ZMQ inproc 通信线路来处理将从服务器接收的数据(基于 REQ/REP 的基础)传达到应用程序内的其他线程。
使用 CAsyncSocket,只要要接收的套接字上有新数据可用,MFC 框架就会调用 OnReceive 方法(对于那里的核心 MFC 大师来说,这可能是过度简化)。
ZMQ中是否有这样的机制? 还是我必须添加 UI 线程启动的额外专用工作线程来处理我与应用其余部分的 ZMQ 通信? 两个管道上的流量都很小,所以如果我可以使用 1,我真的不想创建 2 个单独的线程。
请注意,我已经有了基础知识,我只是在同步方面遇到了问题。 如果我使用与 ZMQ 一起使用阻塞 recv/send ,它会耗尽我的 CAsycSocket,因为 Windows 消息永远不会被线程处理,导致有时永远不会从 ZMQ 应该交付的服务器获取数据。 但是,如果我使用非阻塞 ZMQ 调用,那么线程经常最终处于空闲状态,因为它不知道读取 ZMQ 套接字。
最终,答案是否定的。 当前没有关于数据何时到达 ZeroMQ 的回调/通知,您可以链接到该回调/通知。 我也找不到任何添加此功能的分叉。
在单个线程中使用 MFC 套接字框架提供的传统 OnReceive 调用时,我无法让 ZMQ 工作,并且添加第二个线程专用于 ZMQ 破坏了使用它的全部目的(它用于线程同步)。
我的实现最终删除了 MFC 套接字,并将 ZMQ 用于我的 inproc 服务器(用于与其他线程通信)以及我的 TCP(非 ZMQ)服务器连接,并在 OnIdle() 方法中使用阻塞轮询调用 (zmq_poll())(每次返回 1 以创建繁忙循环)。 阻止投票
BOOL CMyThreaClass::OnIdle(LONG lCount)
{
UNREFERENCED_PARAMETER(lCount);
zmq_pollitem_t items [] = {
{ m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
{ m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
};
const int iZMQInfiniteTimeout(-1);
iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
TRACE("zmq_poll result: %dn", iResult);
if (items[0].revents & ZMQ_POLLIN)
{
sMyStruct sMessage;
iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
TRACE("inproc recv result: %dn", iResult);
// Process inproc messages
iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
TRACE("inproc send result: %dn", iResult);
}
if (items[1].revents & ZMQ_POLLIN)
{
// there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
uint8_t id [256];
size_t id_size = 256;
iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
TRACE("getsockopt poll result %d:id %dn", iResult, id);
iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
// now get our actual data
char szBuffer[1024];
int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
if (iBytesReceived > 0)
{
// process TCP data
}
}
}
注意:此答案需要使用 ZMQ 4 或更高版本,因为早期版本的 ZMQ 不会与常规 TCP 套接字连接通信。
您可以通过覆盖CWinApp::OnIdle()
来调用应用主消息循环中带有 ZMQ_NOBLOCK
标志的 zmq_recv()
。 如果套接字上没有数据等待,zmq_recv
将立即返回。 如果有数据,请处理它 - 但请注意,如果您执行缓慢的操作,将使应用程序无响应。
编辑:我没有意识到当消息队列变为空时,OnIdle
只被调用一次。 但根据 MSDN 的文档,您可以返回一个非零值以永久调用:
- 如果消息循环检查消息队列并且未找到挂起的消息,它将调用
OnIdle
并提供0
作为lCount
参数。 -
OnIdle
执行一些处理并返回一个非零值,以指示应再次调用该值以执行进一步处理。
消息 - 循环将再次检查消息队列。如果没有挂起的消息,它将再次调用
OnIdle
,递增lCount
参数。 - 最终,
OnIdle
完成处理其所有空闲任务并返回0
。这会告诉消息循环停止调用OnIdle
,直到从消息队列收到下一条消息,此时空闲循环将重新启动,参数设置为0
。
我还在 GameDev.net 上发现了这个线程,其中用户说:
我曾经用 MFC 编写的所有使用 D3D 的工具都使用 OnIdle() 函数:
BOOL CD3DEditorApp::OnIdle(LONG lCount)
{
// Call base class first
CWinApp:OnIdle( lCount );
// game stuff...
// Always return true - this asks the framework to constantly
// call the Idle function when it isn't busy doing something
// else.
return TRUE;
}
所以,至少根据一个人的说法,这是一种常见的技术。
您可以使用自定义 Windows 消息在线程之间发出信号。下面是一条自定义消息:
#define WM_MY_MESSAGE (WM_APP + 1)
要将其发送到具有窗口的线程,请使用PostMessage或SendMessage to the HWND。使用ON_MESSAGE将其添加到窗口的消息映射中。
要将其发送到没有窗口的 CWinThread 派生线程,请使用 PostThreadMessage 并使用 ON_THREAD_MESSAGE 接收它。
- 架构决策:返回std::future还是提供回调
- 正在为Xtensa simcall函数编写回调函数
- 如何在C++中使用非静态成员函数作为回调函数
- FLTK:按下哪个按钮 - 将数字传递给按钮的回调 (lambda)
- 在简单示例中,Python3 + ctypes 回调会导致内存泄漏
- 用于在回调中调用解析器的设计模式
- 如何使用C++对象的成员函数作为 C 样式回调?
- Java从C++回调到C++回调
- 如何将成员函数作为回调参数传递给需要"typedef-ed"自由函数指针的函数?
- 从不同的 cpp 调用回调函数会导致bad_function_call
- pcap_handler回调仅在使用 NPCAP v0.9991 时包含空数据包
- 不带轮询的 SDL2 事件回调
- C++存储带有可变参数的回调
- 如何使用 Node-addon-API 实现 node-nan 回调
- 处理影响跨不同线程共享对象的定时回调的最佳方法是什么?
- 通知文件描述符准备使用回调读取
- C++:使用回调函数作为事件通知程序
- VDS(虚拟磁盘服务)COM接口通知-仅在注销期间调用回调(接收器)(Unadvise)
- ZeroMQ是否有数据到达时的通知/回调事件/消息
- 当通知程序/信号超出范围时,在提升绑定中解除分配回调侦听器对象