本人使用emqx官网客户端JAVA开发示例(springboot开发),在callback回调函数内@Autowired 业务类,报错空指针null,并中断了mqtt连接。
已尝试网上两种方法均无效:1、@PostConstruct构造初始化,实际并没有生效,没有初始化。2、使用SpringUtil容器 getBean,依旧报错空指针。(两种方法都做了多次检查)
本人小白,请各位大佬指点,感谢感谢!
本人使用emqx官网客户端JAVA开发示例(springboot开发),在callback回调函数内@Autowired 业务类,报错空指针null,并中断了mqtt连接。
已尝试网上两种方法均无效:1、@PostConstruct构造初始化,实际并没有生效,没有初始化。2、使用SpringUtil容器 getBean,依旧报错空指针。(两种方法都做了多次检查)
本人小白,请各位大佬指点,感谢感谢!
有代码吗,我好像没有遇到过
package mqtt.demo;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App {
public static void main(String args) {
String subTopic = “***”;
String pubTopic = “testtopic/1”;
String content = “Hello World”;
int qos = 2;
String broker = “tcp://broker.emqx.io:1883”;
String clientId = “emqx_test”;
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_test");
connOpts.setPassword("emqx_test_password".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");
// client.disconnect();
// System.out.println("Disconnected");
// client.close();
// System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
-----------------------------回调函数--------------------------------------------------------------------------------------------
package mqtt.demo;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@Component
public class OnMessageCallback implements MqttCallback {
@Autowired
private wwbzService wwbzService;
public static OnMessageCallback OnMessageCallback;
@PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作
public void init() {
System.out.println("////////////初始化/////////////");
OnMessageCallback = this;
OnMessageCallback.wwbzService = this.wwbzService;
}
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
JSONObject obj= JSON.parseObject(new String(message.getPayload()));//将json字符串转换为json对象
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
switch (new String(obj.getString("name"))) {
case "tsly":
// Pressure=obj.getJSONArray("Pressure");
// System.out.println(Pressure.getDoubleValue(0));
System.out.println(new String(obj.getString("Pressure")));
wwbz bz=new wwbz();
Date date=new Date();
SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// System.out.println(sdf.format(date));
wwbzServiceImpl wwbzServiceImpl=SpringUtil.getBean(wwbzServiceImpl.class);
bz.setTime(sdf.format(date));
bz.setMessage("hello");
// wwbzServiceImpl.save(bz);
OnMessageCallback.wwbzService.save(bz);
break;
default:
break;
}
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
package mqtt.demo;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class App // {
public static void main(String args) {
String subTopic = “----”;
String pubTopic = “testtopic/1”;
String content = “Hello World”;
int qos = 2;
String broker = “tcp://broker.emqx.io:1883”;
String clientId = “emqx_test”;
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_test");
connOpts.setPassword("emqx_test_password".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
// 设置回调
client.setCallback(new OnMessageCallback());
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");
// client.disconnect();
// System.out.println("Disconnected");
// client.close();
// System.exit(0);
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
//}
------------------------------------回调函数-----------------------------------------
package mqtt.demo;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
@Component
public class OnMessageCallback implements MqttCallback //{
@Autowired
private wwbzService wwbzService;
public static OnMessageCallback OnMessageCallback;
@PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作
public void init() {
System.out.println("////////////初始化/////////////");
OnMessageCallback = this;
OnMessageCallback.wwbzService = this.wwbzService;
}
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开");
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
JSONObject obj= JSON.parseObject(new String(message.getPayload()));//将json字符串转换为json对象
System.out.println("接收消息主题:" + topic);
System.out.println("接收消息Qos:" + message.getQos());
System.out.println("接收消息内容:" + new String(message.getPayload()));
switch (new String(obj.getString("name"))) {
case "tsly":
// Pressure=obj.getJSONArray("Pressure");
// System.out.println(Pressure.getDoubleValue(0));
System.out.println(new String(obj.getString("Pressure")));
wwbz bz=new wwbz();
Date date=new Date();
SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// System.out.println(sdf.format(date));
wwbzServiceImpl wwbzServiceImpl=SpringUtil.getBean(wwbzServiceImpl.class);
bz.setTime(sdf.format(date));
bz.setMessage("hello");
// wwbzServiceImpl.save(bz);
OnMessageCallback.wwbzService.save(bz);
break;
default:
break;
}
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
//}
可读性有点差,只能这样了
// 设置回调
client.setCallback(new OnMessageCallback());
我不太确定,但这里 OnMessageCallback
对象并非Spring容器托管,无法注入 wwbzService
对象
大佬,有别的办法注入Dao或者Service吗
// 设置回调
client.setCallback(SpringUtil.getBean(OnMessageCallback.class););
你是Spring Boot的Web开发嘛?你这里mqtt客户端怎么直接写在main函数。关于Spring Boot结合Mqtt,有下面两个框架:
Spring Integration
可以参考下面这几个文档 Spring Integration官方文档、Spring Integration 集成 MQTT、 Spring Integration 集成MQTT案例讲述及源码、Spring Integration结合mqtt Demo
mica-mqtt
如果你在Dao层或者Service层注入,可以使用 @Bean
将mqtt客户端注册成Bean对象交由Spring容器管理,这样也可以注入 OnMessageCallback
对象了
好的,我试试
Dao加了@Bean还是不行,Dao那里有@Mapper,按理说是不是也交由容器托管了。
@Mapper是会让Mybatis创建Bean对象,我给你一个例子好了:
@Configuration
public class MqttConfig {
@Bean
public MqttClient mqttClient(@Qualifier("mqttCallback") MqttCallback mqttCallback) throws MqttException {
MqttClient client = new MqttClient(
"tcp://localhost:1883",
"mqtt_id"
);
MqttConnectionOptions connectionOptions = new MqttConnectionOptionsBuilder()
.username("username")
.password("password".getBytes())
.cleanStart(true)
.connectionTimeout(5000)
.automaticReconnect(true)
.build();
client.setCallback(mqttCallback);
client.connect(connectionOptions);
return client;
}
@Bean
public MqttCallback mqttCallback(TestService testService) {
return new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
testService.test();
}
};
}
}
测试:
@SpringBootTest
@EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)
class ExampleApplicationTests {
@Autowired
private MqttClient mqttClient;
@Test
void contextLoads() throws MqttException, InterruptedException {
mqttClient.subscribe("test", 2);
mqttClient.publish("test", new MqttMessage("123".getBytes()));
}
}
这里将 MqttCallback
回调对象注册成Bean对象交由Spring容器管理,在注册时可以把你的相关数据库存储的业务方法类(代码中以 TestService
代替)注入进来。
当向Spring容器中注册 MqttClient
mqtt客户端对象时,就可以把注册好的 MqttCallback
回调对象注入,并用 setCallback
方法设置此客户端对象接收到订阅topic时的回调对象。
以上代码中,通过 @Autowired
注解取得了Spring容器中的 mqttClient
对象,该 mqttClient
对象订阅了 test
主题,并设置了回调对象 MqttCallback
,当接收到消息时,会执行 MqttCallback
类中 messageArrived
方法,即执行你的数据库存储业务方法(代码中以 TestService.test()
代替)
感谢!!
大佬,还是报错:Failed to load ApplicationContext ,太难了!
@Configuration
public class mqttConfiguration {
int qos = 2;
String broker = "tcp://broker.emqx.io:1883";
String clientId = "emqx_test";
MemoryPersistence persistence = new MemoryPersistence();
@Bean
public MqttClient mqttClient(@Qualifier("mqttCallback") MqttCallback mqttCallback) throws MqttException {
MqttClient client = new MqttClient(
broker, clientId, persistence
);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("emqx_test");
connOpts.setPassword("emqx_test_password".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
client.setCallback(mqttCallback);
client.connect(connOpts);
// 订阅
//client.subscribe(subTopic);
return client;
}
@Bean
public MqttCallback mqttCallback(wwbzDao wwbzDao) {
return new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
JSONObject obj= JSON.parseObject(new String(message.getPayload()));//将json字符串转换为json对象
wwbz bz=new wwbz();
Date date=new Date();
SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
bz.setTime(sdf.format(date));
bz.setMessage("hello");
wwbzDao.insert(bz);
System.out.println("数据存储成功!");
}
@Override
public void connectionLost(Throwable arg0) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'connectionLost'");
}
@Override
public void deliveryComplete(IMqttDeliveryToken arg0) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'deliveryComplete'");
}
};
}
}
--------------测试-----------------
@SpringBootTest(classes =mqttConfiguration.class)
@EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)
class DemoApplicationTests {
@Autowired
private MqttClient mqttClient;
@Test
void contextLoads() throws MqttException {
// System.out.println(wwbzDao.selectList(null).toString());
String subTopic = "test";
mqttClient.subscribe(subTopic,2);
}
有具体的报错原因吗?把 @SpringBootTest(classes =mqttConfiguration.class)
改成 @SpringBootTest
试试
加@SpringBootTest(classes=…) 也是因为报错:
Unable to find a @SpringBootConfiguration, you need to use @ContextConfiguration or @SpringBootTest(classes=…) with your test
不指定 @SpringBootTest
的 classes
字段试试,或者去网上搜搜看 SpringBoot测试失败并报错: Unable to find a @SpringBootConfiguration, you need to use @ContextConfiguration
好的
解决了吗
没有