使用eclipse.paho.mqttv5.client订阅相应主题后接收不到保留消息

如题,使用eclipse.paho.mqttv5.client订阅相应主题后接收不到保留消息。clearStart设置为true,订阅的主题也匹配(sensor/+匹配sensor/t2),连接也正常,即时发送的普通消息都能收到,使用MQTTX可以正常收到保留消息,具体代码如下:

import com.emqx.demo.proerties.MqttOptions;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.nio.charset.StandardCharsets;

@Slf4j
@Configuration
public class MqttV5ClientConfig {

    @Autowired
    private MqttOptions mqttOptions;

    @Bean
    public IMqttClient mqttV5Client() throws Exception {

        MqttClient client = new MqttClient(mqttOptions.getBroker(), mqttOptions.getClientId());
        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setCleanStart(mqttOptions.isCleanStart());
        options.setSessionExpiryInterval(mqttOptions.getSessionExpiryInterval());
        options.setConnectionTimeout(mqttOptions.getConnectTimeout());
        options.setKeepAliveInterval(mqttOptions.getKeepAlive());
        options.setAutomaticReconnect(mqttOptions.isAutoReconnect());
        options.setMaxReconnectDelay(mqttOptions.getReconnectInterval());
        options.setUserName(mqttOptions.getUsername());
        options.setPassword(mqttOptions.getPassword().getBytes(StandardCharsets.UTF_8));
        log.info("设置 MQTT V5 客户端配置完成");
        return client;
    }
}
import com.emqx.demo.event.MqttV5MessageEvent;
import com.emqx.demo.proerties.MqttOptions;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttClient;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Slf4j
@Component
public class MqttV5ClientWrapper {


    private final IMqttClient mqttClient;

    public MqttV5ClientWrapper(
            @Qualifier("mqttV5Client") IMqttClient mqttClient,
            ApplicationEventPublisher publisher,
            MqttOptions mqttOptions) {
        this.mqttClient = mqttClient;
        // 设置回调
        this.mqttClient.setCallback(new MqttCallbackAdapter(publisher, mqttClient, mqttOptions));
    }

    @PostConstruct
    public void init() throws MqttException {
        if (!mqttClient.isConnected()) {
            mqttClient.connect();
            log.info("Connecting MQTT client.");
        }
    }

    @PreDestroy
    public void destroy() {
        try {
            if (mqttClient != null && mqttClient.isConnected()) {
                mqttClient.disconnect();
                log.info("MQTT Client: Disconnecting...");
            }
        } catch (Exception e) {
            log.error("Error disconnecting MQTT client: {}", e.getMessage());
        }

        try {
            if (mqttClient != null) {

                mqttClient.close();
                log.info("MQTT Client: Closing...");
            }
        } catch (Exception e) {
            log.error("Error closing MQTT client: {}", e.getMessage());
        }
    }

    // 内部类用于处理回调事件
    private static class MqttCallbackAdapter implements MqttCallback {

        private final ApplicationEventPublisher publisher;
        private final IMqttClient mqttClient;
        private final MqttOptions mqttOptions;

        public MqttCallbackAdapter(ApplicationEventPublisher publisher, IMqttClient mqttClient, MqttOptions mqttOptions) {
            this.publisher = publisher;
            this.mqttClient = mqttClient;
            this.mqttOptions = mqttOptions;
        }


        @Override
        public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
            log.info("Disconnected from broker.");
        }

        @Override
        public void mqttErrorOccurred(MqttException e) {
            log.error("MQTT Error: {}", e.getMessage());
        }

        /**
         * 监听收到消息
         * @param topic name of the topic on the message was published to
         * @param message the actual message.
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) {
            publisher.publishEvent(new MqttV5MessageEvent(this, topic, message));
        }

        @Override
        public void deliveryComplete(IMqttToken iMqttToken) {
            log.info("Delivery complete.");
        }

        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            log.info("Connected to broker: {}", serverURI);
            try {
                String[] topicFilter = mqttOptions.getTopicFilter();
                int[] qos = mqttOptions.getQos();
                mqttClient.subscribe(topicFilter, qos);

                for (int i = 0; i < topicFilter.length; i++) {
                    log.info("Subscribed to topic: {}, qos: {}", topicFilter[i], qos[i]);
                }
            } catch (MqttException e) {
                log.error("Subscribe failed: {}", e.getMessage());
            }
        }

        @Override
        public void authPacketArrived(int i, MqttProperties mqttProperties) {
            log.info("Auth packet arrived.");
        }

    }
}

请求社区的专家指点帮助,谢谢!

我不会 java,看不懂,推荐去 java 社区问问。或者问问 AI。

AI也不清楚,有没有懂这个客户端的专家