参照网上找的paho MQTT客户端代码,写了个服务端的MQTT客户端,大部分功能都没有问题,就是MQTT客户端重连出了问题:如果是因为一定时间内没有消息收发导致的Mqtt连接断开,这时候重连是没有问题的;但是如果是客户端id冲突这种情况(通过MQTTX配置相同的客户端id来模拟),MQTTX在连接后,服务端在检测到连接断开后理论上应该进行重连,但是实际上却报错了:
而且很奇怪的是,根据上面的INFO我判断出重连跳过了认证链的第一步(内部数据库)直接到达了第二步(HTTP服务验证),关键还认证了两次!而服务端连接EMQX应该只走第一步的,毕竟springboot启动时第一次连接EMQX是正常连接的。以下是我的认证链:
由于是两次认证,我想到MqttProperties的参数中有个reconnect,且设置为true了,我把其改成false后,仍然没有效果,还是和之前一样。于是我就想:干脆不用connectonLost回调函数的重连逻辑了,将其注释,并把reconnect设置为true,重启后,再次复现问题 ,发现可以正常重连了
虽然实现了重连机制,定位到了问题出在connectonLost回调函数的重连逻辑,但目前还是没找到重连逻辑哪里有问题,下面会附上重连相关的代码,麻烦大家帮忙看下!EMQX新手小白一枚,还望多多指教!
/**
* 客户端连接断开后触发
* 这里可以做重新链接操作
*/
@Override
public void connectionLost(Throwable cause) {
logger.error("【MQTT-服务端】链接断开!原因为:"+cause.toString());
logger.info("【MQTT-服务端】重新连接emqx....................................................");
for (int i = 0; i < 5; i++) {
if(mqttSendClient.reconnection()) {
break;
}else{
try {
Thread.sleep(i * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 重新连接
*/
public boolean reconnection() {
try {
mqttClient.connect();
Thread.sleep(1000);
if( mqttClient.isConnected() ) {
return true;
}
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 客户端连接
* @return
*/
public void connect(){
MqttClient client = null;
try {
client = new MqttClient(mqttProperties.getHostUrl(),mqttProperties.getClientid() , new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttProperties.getUsername());
options.setPassword(mqttProperties.getPassword().toCharArray());
options.setConnectionTimeout(mqttProperties.getTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAlive());
options.setCleanSession(mqttProperties.getCleanSession());
options.setAutomaticReconnect(mqttProperties.getReconnect());
MqttSendClient.setClient(client);
try {
// 设置回调
client.setCallback(mqttSendCallBack);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}