public class MqttManager {
private static String SERVER_URI;
private static String CLIENT_ID;
private static final String username = "xxx";
private static final String password = "xxxxx";
private Mqtt5AsyncClient mqttClient;
private boolean isDisconnected;//是否断开连接了
public void connect() {
if (mqttClient != null && mqttClient.getState() == MqttClientState.CONNECTED) {
KLog.d("fufufu", "MQTT 已连接,无需重复连接");
return; // 防止重复连接
}
SERVER_URI = ServiceUrl.getMQTTUrl();
CLIENT_ID = SpUtils.getInstance().decodeString(PreConfig.DEVICE_NO);
KLog.d("fufufu","CLIENT_ID "+CLIENT_ID);
mqttClient = MqttClient.builder()
.useMqttVersion5()
.identifier(CLIENT_ID)
.serverHost(SERVER_URI)
.serverPort(ServiceUrl.getPort())
.simpleAuth()
.username(username)
.password(password.getBytes(StandardCharsets.UTF_8))
.applySimpleAuth()
.automaticReconnectWithDefaultConfig()
.addConnectedListener(asyncResult -> {
if(isDisconnected){
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("deviceNo", CLIENT_ID);
jsonObject.addProperty("timeStamp", CalendarUtils.getTime(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
Gson gson = new Gson();
String jsonString = gson.toJson(jsonObject);
KLog.d("fufufu","ONLINE_TOPIC "+jsonString);
//发送上线订阅消息到服务端
publish("ONLINE_TOPIC",jsonString);
}
isDisconnected = false;
})
.buildAsync();
// 配置遗嘱消息(Will Message)
JsonObject playload = new JsonObject();
playload.addProperty("deviceNo", CLIENT_ID);
playload.addProperty("ts", LogUtils.TIME_FORMAT.format(new Date()));
Gson gson1 = new Gson();
String playloadStr = gson1.toJson(playload);
Mqtt5WillPublish willMessage = Mqtt5WillPublish.builder()
.topic("WILL_TOPIC")
.qos(MqttQos.AT_LEAST_ONCE) // QoS 1
.payload(playloadStr.getBytes()) // 遗嘱消息内容
.delayInterval(180) // 延迟180秒发送
.build();
CompletableFuture<Mqtt5ConnAck> future = mqttClient.connectWith()
.cleanStart(false) // 等同于 cleanSession=false
.willPublish(willMessage)
.keepAlive(60) // 60 秒心跳,防止掉线
.sessionExpiryInterval(300) // 设置 SessionExpiryInterval 为 300 秒(5分钟)
.send();
future.whenComplete((ack, throwable) -> {
if (throwable == null) {
KLog.d("fufufu", "MQTT 连接成功");
// 连接成功后订阅主题
subscribe("DEVICE_PING_TOPIC"); // 订阅主题,服务端检测客户端在线状态
subscribe("BATCH_OPERATE_TOPIC"); // 订阅主题,批量操作
subscribe(CLIENT_ID); // 连接成功后订阅主题,用户标识设备编号
subscribe("ADD_USER_TOPIC"); // 订阅主题用户注册
//发送上线订阅消息到服务端开始
// 定义一个JSON对象
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("deviceNo", CLIENT_ID);
jsonObject.addProperty("timeStamp", CalendarUtils.getTime(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
boolean isClose = SpUtils.getInstance().decodeBoolean(PreConfig.CLOSE_PROCESS_WATCH);//进程守护是否关闭,true关闭进程守护,false开启进程守护
jsonObject.addProperty("keepAlive", !isClose?"1":"0");
boolean status = SpUtils.getInstance().decodeBoolean(PreConfig.NAVIGATION_STATUS);//是否开启底部导航,true--开启底部导航,false--关闭底部导航
jsonObject.addProperty("navigationStatus", status?"1":"0");
boolean enable = SpUtils.getInstance().decodeBoolean(PreConfig.ENABLE_CHECK_UPDATES);//是否开启开机检查更新,true--开启,false--关闭
jsonObject.addProperty("checkUpdateStatus", enable?"1":"0");
String padType = ConfigUtils.getPadType();//平板型号
jsonObject.addProperty("padType", padType);
// 使用Gson将JSON对象转换为JSON字符串
Gson gson = new Gson();
String jsonString = gson.toJson(jsonObject);
KLog.d("fufufu","ONLINE_TOPIC "+jsonString);
publish("ONLINE_TOPIC",jsonString);
//发送上线订阅消息到服务端结束
} else {
KLog.d("fufufu","MQTT 连接失败: " + throwable.getMessage());
}
});
}
public void subscribe(String topic) {
mqttClient.subscribeWith()
.topicFilter(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.callback(publish -> {
String message = new String(publish.getPayloadAsBytes(), StandardCharsets.UTF_8);
KLog.d("fufufu","收到消息topic: " + publish.getTopic().toString());
KLog.d("fufufu","收到消息: " + message);
handMsg(publish.getTopic().toString(),message);
})
.send();
}
public void publish(String topic, String message) {
mqttClient.publishWith()
.topic(topic)
.payload(message.getBytes(StandardCharsets.UTF_8))
.qos(MqttQos.AT_LEAST_ONCE)
.send();
}
}
设备上线是mqtt连接后向服务端发送一条消息,此时设备上线,当触发遗嘱时设备下线
上图是服务端100611这个设备所有的状态变化日志,最后一次状态是上线,但是通过emqx控制台查看设备是离线状态,这种情况是mqtt配置有问题吗