ZeroMQ是否有数据到达时的通知/回调事件/消息

Does ZeroMQ have a notification/callback event/message for when data arrives?

本文关键字:通知 回调 事件 消息 是否 数据 ZeroMQ      更新时间:2023-10-16

我正在尝试将ZMQ集成到严重依赖MFC套接字(CASyncSocket)的现有Windows应用程序中。

我有一个 CWinThread 派生的 UI 线程(没有 GUI),它使用 CAsyncSocket 与服务器异步通信。 我想添加一个 ZMQ inproc 通信线路来处理将从服务器接收的数据(基于 REQ/REP 的基础)传达到应用程序内的其他线程。

使用 CAsyncSocket,只要要接收的套接字上有新数据可用,MFC 框架就会调用 OnReceive 方法(对于那里的核心 MFC 大师来说,这可能是过度简化)。

ZMQ中是否有这样的机制? 还是我必须添加 UI 线程启动的额外专用工作线程来处理我与应用其余部分的 ZMQ 通信? 两个管道上的流量都很小,所以如果我可以使用 1,我真的不想创建 2 个单独的线程。

请注意,我已经有了基础知识,我只是在同步方面遇到了问题。 如果我使用与 ZMQ 一起使用阻塞 recv/send ,它会耗尽我的 CAsycSocket,因为 Windows 消息永远不会被线程处理,导致有时永远不会从 ZMQ 应该交付的服务器获取数据。 但是,如果我使用非阻塞 ZMQ 调用,那么线程经常最终处于空闲状态,因为它不知道读取 ZMQ 套接字。

最终,答案是否定的。 当前没有关于数据何时到达 ZeroMQ 的回调/通知,您可以链接到该回调/通知。 我也找不到任何添加此功能的分叉。

在单个线程中使用 MFC 套接字框架提供的传统 OnReceive 调用时,我无法让 ZMQ 工作,并且添加第二个线程专用于 ZMQ 破坏了使用它的全部目的(它用于线程同步)。

我的实现最终删除了 MFC 套接字,并将 ZMQ 用于我的 inproc 服务器(用于与其他线程通信)以及我的 TCP(非 ZMQ)服务器连接,并在 OnIdle() 方法中使用阻塞轮询调用 (zmq_poll())(每次返回 1 以创建繁忙循环)。 阻止投票

BOOL CMyThreaClass::OnIdle(LONG lCount)
{
    UNREFERENCED_PARAMETER(lCount);
    zmq_pollitem_t items [] = {
    { m_pZMQInprocServer, 0, ZMQ_POLLIN, 0 },
    { m_pZMQTCPSocket, 0, ZMQ_POLLIN, 0 }
    };
    const int iZMQInfiniteTimeout(-1);
    iResult = zmq_poll(&items[0], sizeof(items) / sizeof(items[0]), iZMQInfiniteTimeout);
    TRACE("zmq_poll result: %dn", iResult);
    if (items[0].revents & ZMQ_POLLIN)
    {
        sMyStruct sMessage;
        iResult = zmq_recv(m_pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_DONTWAIT); // don't block (the zmq_poll blocks for us)
        TRACE("inproc recv result: %dn", iResult);
        //  Process inproc messages
        iResult = zmq_send(pZMQInprocServer, &sMessage, sizeof(sMessage), ZMQ_NULL); // block
        TRACE("inproc send result: %dn", iResult);
    }
    if (items[1].revents & ZMQ_POLLIN)
    {
        // there will be an ZMQ_IDENTITY identifier on the beginning of the socket buffer, read it off first
        uint8_t id [256];
        size_t id_size = 256;
        iResult = zmq_getsockopt(m_pZMQTCPSocket, ZMQ_IDENTITY, id, &id_size);
        TRACE("getsockopt poll result %d:id %dn", iResult, id);
        iResult = zmq_recv(m_pZMQTCPSocket, &id, id_Size, ZMQ_DONTWAIT); // don't block
        // now get our actual data
        char szBuffer[1024];
        int iBytesReceived = zmq_recv(m_pZMQSocket, szBuffer, sizeof(szBuffer), ZMQ_DONTWAIT);
        if (iBytesReceived > 0)
        {
            // process TCP data
        }
    }
}

注意:此答案需要使用 ZMQ 4 或更高版本,因为早期版本的 ZMQ 不会与常规 TCP 套接字连接通信。

您可以通过覆盖CWinApp::OnIdle()来调用应用主消息循环中带有 ZMQ_NOBLOCK 标志的 zmq_recv()。 如果套接字上没有数据等待,zmq_recv将立即返回。 如果有数据,请处理它 - 但请注意,如果您执行缓慢的操作,将使应用程序无响应。

编辑:我没有意识到当消息队列变为空时,OnIdle只被调用一次。 但根据 MSDN 的文档,您可以返回一个非零值以永久调用:

  1. 如果消息循环检查消息队列并且未找到挂起的消息,它将调用 OnIdle 并提供 0 作为 lCount 参数。
  2. OnIdle执行一些处理并返回一个非零值,以指示应再次调用该值以执行进一步处理。
  3. 消息
  4. 循环将再次检查消息队列。如果没有挂起的消息,它将再次调用OnIdle,递增lCount参数。
  5. 最终,OnIdle完成处理其所有空闲任务并返回0 。这会告诉消息循环停止调用OnIdle,直到从消息队列收到下一条消息,此时空闲循环将重新启动,参数设置为 0

我还在 GameDev.net 上发现了这个线程,其中用户说:

我曾经用 MFC 编写的所有使用 D3D 的工具都使用 OnIdle() 函数:

BOOL CD3DEditorApp::OnIdle(LONG lCount) 
{
    // Call base class first
    CWinApp:OnIdle( lCount );
    // game stuff...
    // Always return true - this asks the framework to constantly
    // call the Idle function when it isn't busy doing something
    // else.
    return TRUE;
}

所以,至少根据一个人的说法,这是一种常见的技术。

您可以使用自定义 Windows 消息在线程之间发出信号。下面是一条自定义消息:

#define WM_MY_MESSAGE (WM_APP + 1)

要将其发送到具有窗口的线程,请使用PostMessage或SendMessage to the HWND。使用ON_MESSAGE将其添加到窗口的消息映射中。

要将其发送到没有窗口的 CWinThread 派生线程,请使用 PostThreadMessage 并使用 ON_THREAD_MESSAGE 接收它。