Libmosquitto publish 不会将所有消息传递到 Azure IoT Hub
Libmosquitto publish doesn't deliver all messages to Azure IoT Hub
我正在尝试每秒向 Azure IoT 中心内置事件中心发布 100 多条消息。我正在使用libmosquitto 1.6.8库。我使用的是 Azure IoT 中心的免费层包,我知道每秒有 100 条消息的限制。但这与这个问题无关。我甚至无法将一半的消息发布到 AZ IoT 中心。
基本上,我有一个需要发送的多映射中多个值的列表。指标列表:
std::multimap< const std::string, std::tuple< const std::string, const std::string, float> > calculatedMetricList;
我将遍历多重映射并将每个值构造为对象有效负载,并将其发送。这意味着 mosquitto_publish 方法将被多次调用。
以下是用于发布消息的代码:
void MosquittoClient::sendDataToUpstreamSystem(){
StatisticalMethod statisticalMethod;
int rc;
MosquittoClient pub_mosq(
"<IoT Hub Name>.azure-devices.net",
"<deviceID>",
"<username>",
"<Password>",
"devices/<deviceID>/messages/events/");
printf("Using MQTT to get data payload from host: %s and on port: %d.rn", pub_mosq.get_host(), pub_mosq.get_port());
// init the mosquitto lib
mosquitto_lib_init();
// create the mosquito object
struct mosquitto* mosq = mosquitto_new(pub_mosq.get_deviceID(), false, NULL);
// add callback functions
mosquitto_connect_callback_set(mosq, MosquittoClient::connect_callback);
mosquitto_publish_callback_set(mosq, MosquittoClient::publish_callback);
mosquitto_message_callback_set(mosq, MosquittoClient::on_message);
mosquitto_disconnect_callback_set(mosq, MosquittoClient::on_disconnect_callback);
// set mosquitto username, password and options
mosquitto_username_pw_set(mosq, pub_mosq.get_userName(), pub_mosq.get_password());
// specify the certificate to use
std::ifstream infile(pub_mosq.get_certificate());
bool certExists = infile.good();
infile.close();
if (!certExists)
{
printf("Warning: Could not find file '%s'! The mosquitto loop may fail.rn", pub_mosq.get_certificate());
}
printf("Using certificate: %srn", pub_mosq.get_certificate());
mosquitto_tls_set(mosq, pub_mosq.get_certificate(), NULL, NULL, NULL, NULL);
// specify the mqtt version to use
int* option = new int(MQTT_PROTOCOL_V311);
rc = mosquitto_opts_set(mosq, MOSQ_OPT_PROTOCOL_VERSION, option);
if (rc != MOSQ_ERR_SUCCESS)
{
rc = pub_mosq.mosquitto_error(rc, "Error: opts_set protocol version");
}
else
{
printf("Setting up options OKrn");
}
// connect
printf("Connecting...rn");
rc = mosquitto_connect_async(mosq, pub_mosq.get_host(), pub_mosq.get_port(), 4);
if (rc != MOSQ_ERR_SUCCESS)
{
rc = pub_mosq.mosquitto_error(rc, NULL);
}
else
{
printf("Connect returned OKrn");
rc = mosquitto_loop_start(mosq);
if (rc != MOSQ_ERR_SUCCESS)
{
rc = pub_mosq.mosquitto_error(rc, NULL);
}
else
{
do
{
for (auto itr = Metrics::calculatedMetricList.begin(); itr != Metrics::calculatedMetricList.end(); itr++) {
int msgId = rand();
std::string test1= itr->first;
std::string test2 = std::get<0>(itr->second);
std::string test3= std::get<1>(itr->second); // metric type
float value = std::get<2>(itr->second); // value
DataPayload objectPayload(
75162345,
496523,
test3,
value,
"test",
test1,
"test",
"test",
123,
213,
23
);
objectPayload.setPayload();
std::string dataPayload = objectPayload.getPayload();
//DEBUG
std::cout << "dataPayload: " << dataPayload << std::endl;
//DEBUG
std::cout << "dataPayload Size: " << dataPayload.size() << std::endl;
// once connected, we can publish (send) a Telemetry message
printf("Publishing to topic: %srn", pub_mosq.get_topic());
rc = pub_mosq.publishToTopic(mosq, &msgId, dataPayload.size(), (char *)dataPayload.c_str());
if (rc == MOSQ_ERR_SUCCESS)
{
printf("Publish returned OKrn");
}
else
{
rc = pub_mosq.mosquitto_error(rc, NULL);
}
}
} while (rc != MOSQ_ERR_SUCCESS);
}
}
mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();}
发布方法:
int MosquittoClient::publishToTopic(struct mosquitto *mosq, int *msgId, int sizeOfData, char *data)
{
return mosquitto_publish(mosq, msgId, p_topic, sizeOfData, data, 1, true);
}
根据控制台,运行程序时,发布的所有消息都返回正常。但只有一两条消息出现在 Azure IoT 中心端。
下图显示了对 IoT 中心的监视,当时只有一条消息通过。 设备资源管理器孪生监视
我已经尝试了许多不同的解决方案,但该程序无法发布所有消息。看起来发布方法正在等待完成第一条消息,但迭代正在移动到下一条消息,导致它被删除。如果这是丢弃消息的原因,那么对消息发送进行排序的最佳方法是什么?否则,还有什么可能导致邮件被丢弃?
更新
问题是程序没有等到消息成功发布到代理(Azure IoT 中心(。如果返回publish_callback,您将知道消息是否已成功发布到代理。
void MosquittoClient::publish_callback(struct mosquitto* mosq, void* userdata, int mid)
{
printf("Publish OK.rn");
}
解决方案是在销毁之前休眠线程,清理调用并在建立连接之前启动 Mosquitto 循环。
sleep(30);
mosquitto_loop_stop(mosq, true);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
mosquitto_publish()
是异步的:让它返回MOSQ_ERR_SUCCESS
只是意味着消息的发布已正确传递给Mosquitto线程。因此,目前您正在排队大量消息,然后在程序有机会实际发送数据包之前终止。
您可以使用MosquittoClient::publish_callback
回调在停止循环和终止程序之前检查所有消息是否已有效发送。
- Libmosquitto publish 不会将所有消息传递到 Azure IoT Hub
- 线程消息传递或更好:在"大师班"中访问其他班级的成员
- 如何在 boost::asio 中将打包的结构作为消息传递?(无序列化)
- "Guaranteed Delivery"消息传递 - 我应该使用 MQTT 还是 ZeroMQ?
- 核心消息传递中未处理的异常.dll在程序关闭期间
- Microsoft具有本机消息传递和非持久连接的边缘扩展不起作用
- Firebase C 云消息传递背景问题
- 从客户端到浏览器的CEF中的消息传递序列化
- 我们是否可以使用 FireBase 云消息传递来发送或接收消息,或者在 Windows 桌面/控制台或 Linux 控制
- 如何将WM_KEYDOWN消息传递到 IWebBrowser2 实例
- 如何在 GNU Radio 中实现消息传递
- msgpack:C++和Java之间的消息传递
- 增强ASIO和线程之间的消息传递
- 交流时Chrome本机消息传递错误
- 多线程C++消息传递
- 跨平台最佳 MVC 模型到控制器消息传递方法(C#、Objective-C++)
- 如何在Chrome原生消息传递扩展的本机应用程序中使用本机主机消息
- 移动窗口(),跳过,躲避或绕过消息传递
- 无效输入会导致多语言 JSON 消息传递系统中的身份验证绕过
- 共享内存系统性能的消息传递接口