springboot开发怎么在回调函数callback里将消息存入数据库

本人使用emqx官网客户端JAVA开发示例(springboot开发),在callback回调函数内@Autowired 业务类,报错空指针null,并中断了mqtt连接。

已尝试网上两种方法均无效:1、@PostConstruct构造初始化,实际并没有生效,没有初始化。2、使用SpringUtil容器 getBean,依旧报错空指针。(两种方法都做了多次检查)

本人小白,请各位大佬指点,感谢感谢!

有代码吗,我好像没有遇到过

package mqtt.demo;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class App {
public static void main(String args) {
String subTopic = “***”;
String pubTopic = “testtopic/1”;
String content = “Hello World”;
int qos = 2;
String broker = “tcp://broker.emqx.io:1883”;
String clientId = “emqx_test”;
MemoryPersistence persistence = new MemoryPersistence();

    try {
        MqttClient client = new MqttClient(broker, clientId, persistence);

        // MQTT 连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName("emqx_test");
        connOpts.setPassword("emqx_test_password".toCharArray());
        // 保留会话
        connOpts.setCleanSession(true);

        // 设置回调
        client.setCallback(new OnMessageCallback());

        // 建立连接
        System.out.println("Connecting to broker: " + broker);
        client.connect(connOpts);

        System.out.println("Connected");
        System.out.println("Publishing message: " + content);

        // 订阅
        client.subscribe(subTopic);

        // 消息发布所需参数
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        client.publish(pubTopic, message);
        System.out.println("Message published");

        // client.disconnect();
        // System.out.println("Disconnected");
        // client.close();
        // System.exit(0);
    } catch (MqttException me) {
        System.out.println("reason " + me.getReasonCode());
        System.out.println("msg " + me.getMessage());
        System.out.println("loc " + me.getLocalizedMessage());
        System.out.println("cause " + me.getCause());
        System.out.println("excep " + me);
        me.printStackTrace();
    }
}

}
-----------------------------回调函数--------------------------------------------------------------------------------------------
package mqtt.demo;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

import jakarta.annotation.PostConstruct;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;

@Component
public class OnMessageCallback implements MqttCallback {

 @Autowired
private wwbzService wwbzService;


public static OnMessageCallback OnMessageCallback;


@PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作
public void init() {
    System.out.println("////////////初始化/////////////");
    OnMessageCallback = this;
    OnMessageCallback.wwbzService = this.wwbzService;
}




public void connectionLost(Throwable cause) {
    // 连接丢失后,一般在这里面进行重连
    System.out.println("连接断开");
}

public void messageArrived(String topic, MqttMessage message) throws Exception {
    // subscribe后得到的消息会执行到这里面
    JSONObject obj= JSON.parseObject(new String(message.getPayload()));//将json字符串转换为json对象
    System.out.println("接收消息主题:" + topic);
    System.out.println("接收消息Qos:" + message.getQos());
    System.out.println("接收消息内容:" + new String(message.getPayload()));

   

    switch (new String(obj.getString("name"))) {
        case "tsly":
            // Pressure=obj.getJSONArray("Pressure");
            // System.out.println(Pressure.getDoubleValue(0));
            System.out.println(new String(obj.getString("Pressure")));
            
            wwbz bz=new wwbz();
            Date date=new Date();
            SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
           // System.out.println(sdf.format(date));
            wwbzServiceImpl wwbzServiceImpl=SpringUtil.getBean(wwbzServiceImpl.class);

            bz.setTime(sdf.format(date));
            bz.setMessage("hello");
           // wwbzServiceImpl.save(bz);
           OnMessageCallback.wwbzService.save(bz);
            
            
            break;
    
        default:
            break;
    }
}

public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("deliveryComplete---------" + token.isComplete());
}

}

package mqtt.demo;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class App // {
public static void main(String args) {
String subTopic = “----”;
String pubTopic = “testtopic/1”;
String content = “Hello World”;
int qos = 2;
String broker = “tcp://broker.emqx.io:1883”;
String clientId = “emqx_test”;
MemoryPersistence persistence = new MemoryPersistence();

    try {
        MqttClient client = new MqttClient(broker, clientId, persistence);

        // MQTT 连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        connOpts.setUserName("emqx_test");
        connOpts.setPassword("emqx_test_password".toCharArray());
        // 保留会话
        connOpts.setCleanSession(true);

        // 设置回调
        client.setCallback(new OnMessageCallback());

        // 建立连接
        System.out.println("Connecting to broker: " + broker);
        client.connect(connOpts);

        System.out.println("Connected");
        System.out.println("Publishing message: " + content);

        // 订阅
        client.subscribe(subTopic);

        // 消息发布所需参数
        MqttMessage message = new MqttMessage(content.getBytes());
        message.setQos(qos);
        client.publish(pubTopic, message);
        System.out.println("Message published");

        // client.disconnect();
        // System.out.println("Disconnected");
        // client.close();
        // System.exit(0);
    } catch (MqttException me) {
        System.out.println("reason " + me.getReasonCode());
        System.out.println("msg " + me.getMessage());
        System.out.println("loc " + me.getLocalizedMessage());
        System.out.println("cause " + me.getCause());
        System.out.println("excep " + me);
        me.printStackTrace();
    }
}

//}
------------------------------------回调函数-----------------------------------------
package mqtt.demo;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fasterxml.jackson.databind.ObjectMapper;

import jakarta.annotation.PostConstruct;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;

@Component
public class OnMessageCallback implements MqttCallback //{

 @Autowired
private wwbzService wwbzService;


public static OnMessageCallback OnMessageCallback;


@PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作
public void init() {
    System.out.println("////////////初始化/////////////");
    OnMessageCallback = this;
    OnMessageCallback.wwbzService = this.wwbzService;
}




public void connectionLost(Throwable cause) {
    // 连接丢失后,一般在这里面进行重连
    System.out.println("连接断开");
}

public void messageArrived(String topic, MqttMessage message) throws Exception {
    // subscribe后得到的消息会执行到这里面
    JSONObject obj= JSON.parseObject(new String(message.getPayload()));//将json字符串转换为json对象
    System.out.println("接收消息主题:" + topic);
    System.out.println("接收消息Qos:" + message.getQos());
    System.out.println("接收消息内容:" + new String(message.getPayload()));

   

    switch (new String(obj.getString("name"))) {
        case "tsly":
            // Pressure=obj.getJSONArray("Pressure");
            // System.out.println(Pressure.getDoubleValue(0));
            System.out.println(new String(obj.getString("Pressure")));
            
            wwbz bz=new wwbz();
            Date date=new Date();
            SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
           // System.out.println(sdf.format(date));
            wwbzServiceImpl wwbzServiceImpl=SpringUtil.getBean(wwbzServiceImpl.class);

            bz.setTime(sdf.format(date));
            bz.setMessage("hello");
           // wwbzServiceImpl.save(bz);
           OnMessageCallback.wwbzService.save(bz);
            
            
            break;
    
        default:
            break;
    }
}

public void deliveryComplete(IMqttDeliveryToken token) {
    System.out.println("deliveryComplete---------" + token.isComplete());
}

//}

可读性有点差,只能这样了

// 设置回调
client.setCallback(new OnMessageCallback());

我不太确定,但这里 OnMessageCallback 对象并非Spring容器托管,无法注入 wwbzService 对象

大佬,有别的办法注入Dao或者Service吗

// 设置回调
client.setCallback(SpringUtil.getBean(OnMessageCallback.class););

你是Spring Boot的Web开发嘛?你这里mqtt客户端怎么直接写在main函数。关于Spring Boot结合Mqtt,有下面两个框架:

1 个赞

如果你在Dao层或者Service层注入,可以使用 @Bean 将mqtt客户端注册成Bean对象交由Spring容器管理,这样也可以注入 OnMessageCallback 对象了

好的,我试试

Dao加了@Bean还是不行,Dao那里有@Mapper,按理说是不是也交由容器托管了。

@Mapper是会让Mybatis创建Bean对象,我给你一个例子好了:

@Configuration
public class MqttConfig {

    @Bean
    public MqttClient mqttClient(@Qualifier("mqttCallback") MqttCallback mqttCallback) throws MqttException {
        MqttClient client = new MqttClient(
                "tcp://localhost:1883",
                "mqtt_id"
        );
        MqttConnectionOptions connectionOptions = new MqttConnectionOptionsBuilder()
                .username("username")
                .password("password".getBytes())
                .cleanStart(true)
                .connectionTimeout(5000)
                .automaticReconnect(true)
                .build();
        client.setCallback(mqttCallback);
        client.connect(connectionOptions);
        return client;
    }

    @Bean
    public MqttCallback mqttCallback(TestService testService) {
        return new MqttCallback() {
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                testService.test();
            }
        };
    }
}

测试:

@SpringBootTest
@EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)
class ExampleApplicationTests {
    @Autowired
    private MqttClient mqttClient;

    @Test
    void contextLoads() throws MqttException, InterruptedException {
        mqttClient.subscribe("test", 2);
        mqttClient.publish("test", new MqttMessage("123".getBytes()));
    }

}

这里将 MqttCallback 回调对象注册成Bean对象交由Spring容器管理,在注册时可以把你的相关数据库存储的业务方法类(代码中以 TestService 代替)注入进来。

当向Spring容器中注册 MqttClient mqtt客户端对象时,就可以把注册好的 MqttCallback 回调对象注入,并用 setCallback 方法设置此客户端对象接收到订阅topic时的回调对象。

以上代码中,通过 @Autowired 注解取得了Spring容器中的 mqttClient 对象,该 mqttClient 对象订阅了 test 主题,并设置了回调对象 MqttCallback,当接收到消息时,会执行 MqttCallback 类中 messageArrived方法,即执行你的数据库存储业务方法(代码中以 TestService.test() 代替)

感谢!!

大佬,还是报错:Failed to load ApplicationContext ,太难了!

@Configuration
public class mqttConfiguration {

    int qos = 2;
    String broker = "tcp://broker.emqx.io:1883";
    String clientId = "emqx_test";
    MemoryPersistence persistence = new MemoryPersistence();

 @Bean
public MqttClient mqttClient(@Qualifier("mqttCallback") MqttCallback mqttCallback) throws MqttException {
    MqttClient client = new MqttClient(
        broker, clientId, persistence
    );
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setUserName("emqx_test");
    connOpts.setPassword("emqx_test_password".toCharArray());
    // 保留会话
    connOpts.setCleanSession(true);
    client.setCallback(mqttCallback);
    client.connect(connOpts);
     // 订阅
    //client.subscribe(subTopic);
    return client;
}

@Bean
public MqttCallback mqttCallback(wwbzDao wwbzDao) {
    return new MqttCallback() {
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            JSONObject obj= JSON.parseObject(new String(message.getPayload()));//将json字符串转换为json对象
            wwbz bz=new wwbz();
            Date date=new Date();
            SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            bz.setTime(sdf.format(date));
            bz.setMessage("hello");
            wwbzDao.insert(bz);
            System.out.println("数据存储成功!");
        }

        @Override
        public void connectionLost(Throwable arg0) {
            // TODO Auto-generated method stub
            throw new UnsupportedOperationException("Unimplemented method 'connectionLost'");
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken arg0) {
            // TODO Auto-generated method stub
            throw new UnsupportedOperationException("Unimplemented method 'deliveryComplete'");
        }
    };
}

}

--------------测试-----------------
@SpringBootTest(classes =mqttConfiguration.class)
@EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)
class DemoApplicationTests {

 @Autowired
private MqttClient mqttClient;

@Test
void contextLoads() throws MqttException {


	// System.out.println(wwbzDao.selectList(null).toString());
	String subTopic = "test";
	mqttClient.subscribe(subTopic,2);
}

有具体的报错原因吗?把 @SpringBootTest(classes =mqttConfiguration.class) 改成 @SpringBootTest 试试

加@SpringBootTest(classes=…) 也是因为报错:
Unable to find a @SpringBootConfiguration, you need to use @ContextConfiguration or @SpringBootTest(classes=…) with your test

不指定 @SpringBootTestclasses 字段试试,或者去网上搜搜看 SpringBoot测试失败并报错: Unable to find a @SpringBootConfiguration, you need to use @ContextConfiguration

好的

解决了吗

没有