关于共享订阅存在的bug

在mqtt客户端订阅topic的时候,使用IMqttMessageListener监听特定的topic,监听器代码如下:


public class DelayedMessageListener implements IMqttMessageListener {
@Autowired
private ApiConfig apiConfig;
@Override
public void messageArrived(String topic, MqttMessage message) {
String resultCode = HttpPoolManager.sendPostRequest(apiConfig.getMeetingController().getDelayedMessage(), new String(message.getPayload()));
log.info(“延迟消息推送结果:{}”, resultCode);
}
}
订阅操作代码如下:

mqttClient.subscribe(“$share/device_links/SYSTEM/delayed_message”,
bizData.getQos(), delayedMessageListener);
mqttDelayedMessage.setTopic(mqttClientTopic);
随后往SYSTEM/delayed_message中发送消息,发现消息并不会被delayedMessageListener监听,而是直接被callback捕获。在dashboard中查看当前客户端的订阅情况:

发现订阅并没有问题,是不是IMqttMessageListener 并不能处理共享订阅的topic?

通过实现 IMqttMessageListener 接口来设置消费者,然后设置共享订阅,这个在mqttv3 版本确实不能被正确处理,正如你的发现,消息会被callback拦截。
如果想用 mqttClient.subscribe(topicFilter, qos, messageListener) 来配置共享订阅,可以使用 mqttv5 版本试试,在 mqttv3 版本,这里需要多做一步,通过在 callback 里面设置路由,通过判断不同的真实 topic 转发到具体的 messageListener 实现