未收到 ZMQ 消息

ZMQ messages not being received

本文关键字:消息 ZMQ      更新时间:2023-10-16

如果我错过了一些简单的东西,请原谅我,这是我第一次对消息传递做任何事情,我从其他人那里继承了这个代码库。

我正在尝试从 IP 为 10.10.10.200 的 Windows 计算机向 IP 为 10.10.10.15 的 Ubuntu 计算机发送消息。

从Windows机器运行TCPView时,我得到了以下结果,这让我怀疑问题出在Ubuntu机器上。如果我没看错,那么我在 Windows 机器上的应用程序已经在端口 5556 上创建了一个连接,这是它应该做的。如果我错了,我也会包含窗口代码。

my_app.exe  5436    TCP MY_COMPUTER 5556    MY_COMPUTER 0   LISTENING                                       

视窗应用代码:

void 
NetworkManager::initializePublisher()
{
globalContext = zmq_ctx_new();
globalPublisher = zmq_socket(globalContext, ZMQ_PUB);
string protocol = "tcp://*:";
string portNumber = PUBLISHING_PORT; //5556
string address = protocol + portNumber;
char *address_ptr = new char[address.size() + 1];
strncpy_s(address_ptr, address.size() + 1, address.c_str(), address.size());
int bind_res = zmq_bind(globalPublisher, address_ptr);
if (bind_res != 0)
{
cerr << "FATAL: couldn't bind to port[" << portNumber << "] and protocol [" << protocol << "]" << endl;
}
cout << " Connection: " << address << endl;
}
void 
NetworkManager::publishMessage(MESSAGE msgToSend)
{
// Get the size of the message to be sent
int sizeOfMessageToSend = MSG_MAX_SIZE;//sizeof(msgToSend);
// Copy IDVS message to buffer
char buffToSend[MSG_MAX_SIZE] = "";
// Pack the message id
size_t indexOfId = MSG_ID_SIZE + 1;
size_t indexOfName = MSG_NAME_SIZE + 1;
size_t indexOfdata = MSG_DATABUFFER_SIZE + 1;
memcpy(buffToSend, msgToSend.get_msg_id(), indexOfId - 1);
// Pack the message name
memcpy(buffToSend + indexOfId, msgToSend.get_msg_name(), indexOfName - 1);
// Pack the data buffer
memcpy(buffToSend + indexOfId + indexOfName, msgToSend.get_msg_data(), indexOfdata - 1);
// Send message
int sizeOfSentMessage = zmq_send(globalPublisher, buffToSend, MSG_MAX_SIZE, ZMQ_DONTWAIT);
getSubscriptionConnectionError();
// If message size doesn't match, we have an issue, otherwise, we are good
if (sizeOfSentMessage != sizeOfMessageToSend)
{
int errorCode = zmq_errno();
cerr << "FATAL: couldn't not send message." << endl;
cerr << "ERROR: " << errorCode << endl;
}
}

如果你认为需要,我可以包含更多这边的代码,但是错误在 Ubuntu 端弹出,所以我将专注于那里。

问题是当我调用zmq_recv时它返回 -1,当我检查zmq_errno时,我再次收到 E(请求非阻塞模式,目前没有可用的消息。我还检查了netstat,在端口5556上没有看到任何东西

首先是连接到发布服务器的函数,然后是获取数据的函数,然后是 main。 Ubuntu 端代码:

void
*connectoToPublisher()
{
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context, ZMQ_SUB);
string protocol = "tcp://";
string ipAddress = PUB_IP;      //10.10.10.15
string portNumber = PUB_PORT;  // 5556
string address = protocol + ipAddress + ":" + portNumber;
cout << "Address: " << address << endl;
char *address_ptr = new char[address.size() + 1];
strcpy(address_ptr, address.c_str());
// ------ Connect to Publisher ------
bool isConnectionEstablished = false;
int connectionStatus;
while (isConnectionEstablished == false)
{
connectionStatus = zmq_connect(subscriber, address_ptr);
switch (connectionStatus)
{
case 0: //we are good.
cout << "Connection Established!" << endl;
isConnectionEstablished = true;
break;
case -1:
isConnectionEstablished = false;
cout << "Connection Failed!" << endl;
getSubscriptionConnectionError();
cout << "Trying again in 5 seconds..." << endl;
break;
default:
cout << "Hit default connecting to publisher!" << endl;
break;
}
if (isConnectionEstablished == true)
{
break;
}
sleep(5); // Try again
}
// by the time we get here we should have connected to the pub
return subscriber;
}
static void *
getData(void *subscriber)
{
const char *filter = ""; // Get all messages
int subFilterResult = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
// ------ Get in main loop ------
while (1)
{
//get messages from publisher
char bufferReceived[MSG_MAX_SIZE] = "";
size_t expected_messageSize = sizeof(bufferReceived);
int actual_messageSize = zmq_recv(subscriber, bufferReceived, MSG_MAX_SIZE, ZMQ_DONTWAIT);
if (expected_messageSize == actual_messageSize)
{
MESSAGE msg = getMessage(bufferReceived); //Uses memcpy to copy id, name, and data strutct data from buffer into struct of MESSAGE
if (strcmp(msg.get_msg_id(), "IDXY_00000") == 0)
{
DATA = getData(msg); //Uses memcpy to copy data from buffer into struct of DATA
}
} else
{
// Something went wrong
getReceivedError(); //This just calls zmq_errno and cout the error
}
usleep(1);
}
}
int main (int argc, char*argv[])
{
//Doing some stuff...
void *subscriber_socket = connectoToHeadTrackerPublisher();
// Initialize Mux Lock
pthread_mutex_init(&receiverMutex, NULL);
// Initializing some variables...
// Launch Thread to get updates from windows machine
pthread_t publisherThread;
pthread_create(&publisherThread,
NULL, getData, subscriber_socket);
// UI stuff
zmq_close(subscriber_socket);
return 0;
}

如果您无法提供解决方案,那么我将接受将问题确定为解决方案。我的主要问题是我没有消息传递或网络方面的知识或经验来正确识别问题。通常,如果我知道出了什么问题,我可以修复它。

好的,这与信令/消息传递框架无关

您的 Ubuntu 代码指示 ZeroMQContext()实例引擎创建一个新的SUB-socket 实例,接下来代码坚持此套接字尝试_connect()(以建立对等对手的tcp://传输类连接)到"相反"接入点,位于设置为10.10.10.15:5556的 Ubuntulocalhost:port#的地址上, 而预期的PUB侧原型接入点实际上并不存在于这台 Ubuntu 机器上,而是存在于另一台 Windows 主机上,其中IP:port#10.10.10.200:5556

这似乎是问题的根本原因,因此请相应地更改它以匹配物理布局,您可能会获得玩具工作。