环境信息
- EMQX 版本:5.0
 - 操作系统及版本:windows
 - 其他
 
问题描述
paho java 异步客户端MqttAsyncClient 执行订阅消息时,提示“客户机未连接”。但是publish是可以成功的,这是什么原因?代码如下:执行到subscribe()方法就提示报错。
配置文件及日志
@Component
public class MqttAsyncPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttAsyncPushClient.class);
    @Autowired
    private PushCallback pushCallback;
    private static MqttAsyncClient client;
    private static MqttAsyncClient getClient() {
        return client;
    }
    private static void setClient(MqttAsyncClient client) {
        MqttAsyncPushClient.client = client;
    }
    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keepalive 保留数
     */
    public void connect(String host, String username, String password, int timeout, int keepalive) {
        MqttAsyncClient client;
        try {
            client = new MqttAsyncClient(host, UUID.randomUUID().toString(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            options.setSocketFactory(SslUtil.getSocketFactory("D:\\soft\\emqx-5.0.8-windows-amd64\\etc\\certs\\ca.pem"));
            MqttAsyncPushClient.setClient(client);
            client.setCallback(pushCallback);
            client.connect(options);
        } catch (MqttException e) {
            logger.error("-------------------客户端连接异常----------------");
            logger.error("reason: " + e.getReasonCode());
            logger.error("msg: " + e.getMessage());
            e.printStackTrace();
        } catch (Exception ee) {
            logger.error("-------------------客户端连接异常----------------");
            logger.error("msg: " + ee.getMessage());
            ee.printStackTrace();
        }
    }
    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public boolean publish(int qos, boolean retained, String topic, String pushMessage) {
        logger.info("mqttAsync"+pushMessage);
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        IMqttDeliveryToken token;
        try {
            token = MqttAsyncPushClient.getClient().publish(topic, message);
            logger.info("结果-------------------"+token.isComplete());
            token.waitForCompletion();
            logger.info("结果-------------------"+token.isComplete());
            return true;
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
            return false;
        } catch (MqttException e) {
            e.printStackTrace();
            return false;
        }
    }
    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("mqttAsync开始订阅主题" + topic);
        try {
            MqttAsyncPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
-------------------------------------------------------------------------------------
这是config代码,项目启动时候,@bean方法里,mqttAsyncPushClient.subscribe()这里报的错
@Configuration
@Setter
@Getter
public class MqttConfig {
//    @Autowired
//    private MqttPushClient mqttPushClient;
    @Autowired
    private MqttAsyncPushClient mqttAsyncPushClient;
    @Value("${mqtt.username}")
    private String username;
    @Value("${mqtt.password")
    private String password;
    @Value("${mqtt.serverURIs}")
    private String hostUrl;
    @Value("${mqtt.clientId}")
    private String clientId;
    @Value("${mqtt.topic}")
    private String defaultTopic;
    @Value("${mqtt.qos}")
    private int qos;
    @Value("${mqtt.enabled}")
    private boolean enabled;
    @Value("${mqtt.keepalive}")
    private int keepalive;
    @Value("${mqtt.timeout}")
    private int timeout;
    private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
    @Bean
    public MqttAsyncPushClient getMqttAsyncPushClient() {
        if(enabled == true){
            String mqtt_topic[] = defaultTopic.split(",");
            mqttAsyncPushClient.connect(hostUrl, username, password, timeout, keepalive);//连接
            for(int i=0; i<mqtt_topic.length; i++){
                mqttAsyncPushClient.subscribe(mqtt_topic[i], 0);//订阅主题
            }
        }
        return mqttAsyncPushClient;
    }
            
        
