环境信息
- EMQX 版本:5.0
- 操作系统及版本:windows
- 其他
问题描述
paho java 异步客户端MqttAsyncClient 执行订阅消息时,提示“客户机未连接”。但是publish是可以成功的,这是什么原因?代码如下:执行到subscribe()方法就提示报错。
配置文件及日志
@Component
public class MqttAsyncPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttAsyncPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttAsyncClient client;
private static MqttAsyncClient getClient() {
return client;
}
private static void setClient(MqttAsyncClient client) {
MqttAsyncPushClient.client = client;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param username 用户名
* @param password 密码
* @param timeout 超时时间
* @param keepalive 保留数
*/
public void connect(String host, String username, String password, int timeout, int keepalive) {
MqttAsyncClient client;
try {
client = new MqttAsyncClient(host, UUID.randomUUID().toString(), new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setSocketFactory(SslUtil.getSocketFactory("D:\\soft\\emqx-5.0.8-windows-amd64\\etc\\certs\\ca.pem"));
MqttAsyncPushClient.setClient(client);
client.setCallback(pushCallback);
client.connect(options);
} catch (MqttException e) {
logger.error("-------------------客户端连接异常----------------");
logger.error("reason: " + e.getReasonCode());
logger.error("msg: " + e.getMessage());
e.printStackTrace();
} catch (Exception ee) {
logger.error("-------------------客户端连接异常----------------");
logger.error("msg: " + ee.getMessage());
ee.printStackTrace();
}
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public boolean publish(int qos, boolean retained, String topic, String pushMessage) {
logger.info("mqttAsync"+pushMessage);
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
IMqttDeliveryToken token;
try {
token = MqttAsyncPushClient.getClient().publish(topic, message);
logger.info("结果-------------------"+token.isComplete());
token.waitForCompletion();
logger.info("结果-------------------"+token.isComplete());
return true;
} catch (MqttPersistenceException e) {
e.printStackTrace();
return false;
} catch (MqttException e) {
e.printStackTrace();
return false;
}
}
/**
* 订阅某个主题
*
* @param topic 主题
* @param qos 连接方式
*/
public void subscribe(String topic, int qos) {
logger.info("mqttAsync开始订阅主题" + topic);
try {
MqttAsyncPushClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
-------------------------------------------------------------------------------------
这是config代码,项目启动时候,@bean方法里,mqttAsyncPushClient.subscribe()这里报的错
@Configuration
@Setter
@Getter
public class MqttConfig {
// @Autowired
// private MqttPushClient mqttPushClient;
@Autowired
private MqttAsyncPushClient mqttAsyncPushClient;
@Value("${mqtt.username}")
private String username;
@Value("${mqtt.password")
private String password;
@Value("${mqtt.serverURIs}")
private String hostUrl;
@Value("${mqtt.clientId}")
private String clientId;
@Value("${mqtt.topic}")
private String defaultTopic;
@Value("${mqtt.qos}")
private int qos;
@Value("${mqtt.enabled}")
private boolean enabled;
@Value("${mqtt.keepalive}")
private int keepalive;
@Value("${mqtt.timeout}")
private int timeout;
private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);
@Bean
public MqttAsyncPushClient getMqttAsyncPushClient() {
if(enabled == true){
String mqtt_topic[] = defaultTopic.split(",");
mqttAsyncPushClient.connect(hostUrl, username, password, timeout, keepalive);//连接
for(int i=0; i<mqtt_topic.length; i++){
mqttAsyncPushClient.subscribe(mqtt_topic[i], 0);//订阅主题
}
}
return mqttAsyncPushClient;
}