通过Eclipse Paho Java Client创建MQTT客户端重连EMQX失败的问题探讨

参照网上找的paho MQTT客户端代码,写了个服务端的MQTT客户端,大部分功能都没有问题,就是MQTT客户端重连出了问题:如果是因为一定时间内没有消息收发导致的Mqtt连接断开,这时候重连是没有问题的;但是如果是客户端id冲突这种情况(通过MQTTX配置相同的客户端id来模拟),MQTTX在连接后,服务端在检测到连接断开后理论上应该进行重连,但是实际上却报错了:

而且很奇怪的是,根据上面的INFO我判断出重连跳过了认证链的第一步(内部数据库)直接到达了第二步(HTTP服务验证),关键还认证了两次!而服务端连接EMQX应该只走第一步的,毕竟springboot启动时第一次连接EMQX是正常连接的。以下是我的认证链:

由于是两次认证,我想到MqttProperties的参数中有个reconnect,且设置为true了,我把其改成false后,仍然没有效果,还是和之前一样。于是我就想:干脆不用connectonLost回调函数的重连逻辑了,将其注释,并把reconnect设置为true,重启后,再次复现问题 ,发现可以正常重连了

虽然实现了重连机制,定位到了问题出在connectonLost回调函数的重连逻辑,但目前还是没找到重连逻辑哪里有问题,下面会附上重连相关的代码,麻烦大家帮忙看下!EMQX新手小白一枚,还望多多指教! :joy:

    /**
     * 客户端连接断开后触发
     * 这里可以做重新链接操作
     */
    @Override
    public void connectionLost(Throwable cause) {
        logger.error("【MQTT-服务端】链接断开!原因为:"+cause.toString());
        logger.info("【MQTT-服务端】重新连接emqx....................................................");
        for (int i = 0; i < 5; i++) {
            if(mqttSendClient.reconnection()) {
                break;
            }else{
                try {
                    Thread.sleep(i * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    /**
     * 重新连接
     */
public boolean reconnection() {
        try {
            mqttClient.connect();
            Thread.sleep(1000);
            if( mqttClient.isConnected() ) {
                return true;
            }
        } catch (MqttException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return  false;
    }
    /**
     * 客户端连接
     * @return
     */
    public void connect(){
        MqttClient client = null;
        try {
            client = new MqttClient(mqttProperties.getHostUrl(),mqttProperties.getClientid() , new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setCleanSession(mqttProperties.getCleanSession());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            MqttSendClient.setClient(client);
            try {
                // 设置回调
                client.setCallback(mqttSendCallBack);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }