如题,使用eclipse.paho.mqttv5.client
订阅相应主题后接收不到保留消息。clearStart设置为true,订阅的主题也匹配(sensor/+匹配sensor/t2),连接也正常,即时发送的普通消息都能收到,使用MQTTX可以正常收到保留消息,具体代码如下:
import com.emqx.demo.proerties.MqttOptions;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttClient;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.nio.charset.StandardCharsets;
@Slf4j
@Configuration
public class MqttV5ClientConfig {
@Autowired
private MqttOptions mqttOptions;
@Bean
public IMqttClient mqttV5Client() throws Exception {
MqttClient client = new MqttClient(mqttOptions.getBroker(), mqttOptions.getClientId());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setCleanStart(mqttOptions.isCleanStart());
options.setSessionExpiryInterval(mqttOptions.getSessionExpiryInterval());
options.setConnectionTimeout(mqttOptions.getConnectTimeout());
options.setKeepAliveInterval(mqttOptions.getKeepAlive());
options.setAutomaticReconnect(mqttOptions.isAutoReconnect());
options.setMaxReconnectDelay(mqttOptions.getReconnectInterval());
options.setUserName(mqttOptions.getUsername());
options.setPassword(mqttOptions.getPassword().getBytes(StandardCharsets.UTF_8));
log.info("设置 MQTT V5 客户端配置完成");
return client;
}
}
import com.emqx.demo.event.MqttV5MessageEvent;
import com.emqx.demo.proerties.MqttOptions;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.mqttv5.client.IMqttClient;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Component
public class MqttV5ClientWrapper {
private final IMqttClient mqttClient;
public MqttV5ClientWrapper(
@Qualifier("mqttV5Client") IMqttClient mqttClient,
ApplicationEventPublisher publisher,
MqttOptions mqttOptions) {
this.mqttClient = mqttClient;
// 设置回调
this.mqttClient.setCallback(new MqttCallbackAdapter(publisher, mqttClient, mqttOptions));
}
@PostConstruct
public void init() throws MqttException {
if (!mqttClient.isConnected()) {
mqttClient.connect();
log.info("Connecting MQTT client.");
}
}
@PreDestroy
public void destroy() {
try {
if (mqttClient != null && mqttClient.isConnected()) {
mqttClient.disconnect();
log.info("MQTT Client: Disconnecting...");
}
} catch (Exception e) {
log.error("Error disconnecting MQTT client: {}", e.getMessage());
}
try {
if (mqttClient != null) {
mqttClient.close();
log.info("MQTT Client: Closing...");
}
} catch (Exception e) {
log.error("Error closing MQTT client: {}", e.getMessage());
}
}
// 内部类用于处理回调事件
private static class MqttCallbackAdapter implements MqttCallback {
private final ApplicationEventPublisher publisher;
private final IMqttClient mqttClient;
private final MqttOptions mqttOptions;
public MqttCallbackAdapter(ApplicationEventPublisher publisher, IMqttClient mqttClient, MqttOptions mqttOptions) {
this.publisher = publisher;
this.mqttClient = mqttClient;
this.mqttOptions = mqttOptions;
}
@Override
public void disconnected(MqttDisconnectResponse mqttDisconnectResponse) {
log.info("Disconnected from broker.");
}
@Override
public void mqttErrorOccurred(MqttException e) {
log.error("MQTT Error: {}", e.getMessage());
}
/**
* 监听收到消息
* @param topic name of the topic on the message was published to
* @param message the actual message.
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
publisher.publishEvent(new MqttV5MessageEvent(this, topic, message));
}
@Override
public void deliveryComplete(IMqttToken iMqttToken) {
log.info("Delivery complete.");
}
@Override
public void connectComplete(boolean reconnect, String serverURI) {
log.info("Connected to broker: {}", serverURI);
try {
String[] topicFilter = mqttOptions.getTopicFilter();
int[] qos = mqttOptions.getQos();
mqttClient.subscribe(topicFilter, qos);
for (int i = 0; i < topicFilter.length; i++) {
log.info("Subscribed to topic: {}, qos: {}", topicFilter[i], qos[i]);
}
} catch (MqttException e) {
log.error("Subscribe failed: {}", e.getMessage());
}
}
@Override
public void authPacketArrived(int i, MqttProperties mqttProperties) {
log.info("Auth packet arrived.");
}
}
}
请求社区的专家指点帮助,谢谢!