package com.taosdata.example.springbootdemo.config;
import com.taosdata.example.springbootdemo.util.DeviceMQTTCallback;
import com.taosdata.example.springbootdemo.service.DeviceService;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
public class MqttDeviceConfig {
public static String clientId = "MQTT2TD";
/**
* 客户端对象
*/
/**
* 在bean初始化后连接到服务器
*/
public void init() throws MqttException {
MqttDeviceClient();
}
MqttClient mqttDeviceClient;
/**
* 客户端连接服务端
*/
public MqttClient MqttDeviceClient() throws MqttException {
//创建MQTT客户端对象
mqttDeviceClient = new MqttClient(hostUrl, clientId, new MemoryPersistence());
//连接设置
MqttConnectOptions options = new MqttConnectOptions();
//是否清空session,设置为false表示服务器会保留客户端的连接记录,客户端重连之后能获取到服务器在客户端断开连接期间推送的消息
//设置为true表示每次连接到服务端都是以新的身份
options.setCleanSession(true);
// options.setAutomaticReconnect(true);
// options.setMaxReconnectDelay(5000); // 最大重连间隔5秒
// options.setExecutorServiceTimeout(120);//// 操作超时时间
//设置连接用户名
options.setUserName(“saomachong”);
//设置连接密码
options.setPassword(“qrCODE_@Gm.886601”.toCharArray());
//设置超时时间,单位为秒
options.setConnectionTimeout(100);
//设置心跳时间 单位为秒,表示服务器每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
options.setKeepAliveInterval(20);
//设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill(“willTopic”, (clientId + “与服务器断开连接”).getBytes(), 1, false);
//设置回调
mqttDeviceClient.setCallback(new DeviceMQTTCallback(mqttDeviceClient, deviceService));
mqttDeviceClient.connect(options);
//订阅主题
//消息等级,和主题数组一一对应,服务端将按照指定等级给订阅了主题的客户端推送消息
int qos = {1, 1,1};
//订阅主题
mqttDeviceClient.subscribe(topics, qos);
return mqttDeviceClient;
}
/**
* 断开连接
*/
public void disConnect() {
try {
mqttDeviceClient.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅主题
*/
public void subscribe(String topic, int qos) {
try {
mqttDeviceClient.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
public void publish(int qos, boolean retained, String topic, String message) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
mqttMessage.setPayload(message.getBytes());
//主题的目的地,用于发布/订阅信息
MqttTopic mqttTopic = mqttDeviceClient.getTopic(topic);
//提供一种机制来跟踪消息的传递进度
//用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度
MqttDeliveryToken token;
try {
//将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态
//一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。
token = mqttTopic.publish(mqttMessage);
// token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
这个是我的链接代码,后台能看到链接并订阅,调用 publish 方法就断开连接,日志没有报错信息,connectionLost 断开原因: " 是 MQTTException(0) 空指针