订阅主题的代码:
client.toAsync().subscribeWith()
.topicFilter(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.callback(publish -> {
String payloadStr = new String(publish.getPayloadAsBytes(), UTF_8);
String messageTopic = publish.getTopic().toString();
log.info("订阅的主题:" + messageTopic + "\n返回的消息值:" + payloadStr);
this.subMessageRecOffer(messageTopic,publish.getPayloadAsBytes());
}).send();
上面的代码运行没有报错,但是EMQX显示这个客户端的订阅如下图所示:
出现我说的这种情况不是一直出现,订阅的主题符合ACL规则,客户端代码提示订阅成功,但实际没订阅成功是偶发,比较难复现。因为发送订阅主题消息
的消息也是通过tcp通道传输,不排除网络原因,EMQX有丢失消息的情况,但是如果丢失了消息,为什么客户端连接的订阅消息的代码会没有报错,如果是生产环境出现这种情况,会造成比较大的困扰(代码层面认为已经订阅,但实际没订阅),会影响业务数据的传输