如何正确理解ZeroMQ的ZMQ_RCVHWM选项

How to understand the ZMQ_RCVHWM option of ZeroMQ correctly?

本文关键字:RCVHWM 选项 ZMQ 正确理解 ZeroMQ      更新时间:2023-10-16

有两个端点,分别是经销商路由器经销商通过TCP协议连接到路由器。我将ZMQ_SNDHWMZMQ_RCVHWM设置为,所有这些都只有一个

注意经销商始终向路由器发送消息,但路由器不执行任何接收操作。

当我只启动经销商时,经销商只发送一条信息。这是意料之中的事。但当我在那之后启动路由器时,经销商可以出乎意料地发送大约4000条消息。

为什么ZMQ_RCVHWM选项似乎无效?

经销商代码:

// create ctx
void* ctx = zmq_ctx_new();
assert(nullptr != ctx);
// create in
void* in = zmq_socket(ctx, ZMQ_DEALER);
assert(in);
int sndhwm = 1;
assert(0 == zmq_setsockopt(in, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)));
assert(0 == zmq_setsockopt(in, ZMQ_RCVHWM, &sndhwm, sizeof(sndhwm)));
int rc = zmq_connect(in, "tcp://127.0.0.1:1012");
assert(!rc);
char content[100] = {0};
int size = 0;
int64_t nCount = 0;
while(1)
{
    sprintf_s(content, "%d", ++nCount);
    size = strlen(content);
    rc = zmq_send(in, content, size, 0);
    assert(rc = size);
    printf("in = %dn", nCount);
}

路由器的代码:

// create ctx
void* ctx = zmq_ctx_new();
void* out = zmq_socket(ctx, ZMQ_ROUTER);
int sndhwm = 1;
assert(0 == zmq_setsockopt(out, ZMQ_SNDHWM, &sndhwm, sizeof(sndhwm)));
assert(0 == zmq_setsockopt(out, ZMQ_RCVHWM, &sndhwm, sizeof(sndhwm)));
int rc = zmq_bind(out, "tcp://127.0.0.1:1012");
assert(!rc);

我在这里回答,因为我知道这是一个新问题,但我是从你的另一个问题派生出来的。

你描述的行为似乎是正确的,因为(强调我的,引用自这里):

"对于指定套接字正在与之通信的任何单个对等体,高水位标记是内存中未处理消息的最大数量的硬限制。

一旦您读取了消息,它就不再在内存中排队,因此会接受新消息。这同样适用于发送的消息,这反映在您描述的行为中(在第一个问题中):

  1. 当路由器未设置时,经销商只发送一条信息,然后被阻塞。没错,因为ZMQ_SNDHWM是1

由于还没有对等端连接,所以第一个消息在存储器中排队,并且没有新消息被接受或有资格被发送。

  1. 但当我设置路由器时,经销商可以继续发送大约4K的消息并受到阻碍。为什么

现在,两个对等端已经连接,消息可以自由流动,因为没有完成内存排队(你在接收循环中不断读取套接字,在你的第一个问题中发布,我在上面提供了哪个链接):

路由器的代码:

//...
while(true)
{
    zmq_msg_init(&msg);
    rc = zmq_recvmsg(out, &msg, 0);
    assert(rc > 0);
    printf("out = %sn", (char*)zmq_msg_data(&msg));
    if(!zmq_msg_more(&msg))
    {
        break;
    }
}
//...

如果您对系统的压力足够大,以至于传入或传出消息开始在内存中排队(即超过套接字发送容量-吞吐量-),您将看到ZMQ_SNDHWMZMQ_RCVHWM的影响。