用mqttClient发消息,在什么时候将连接关闭合适?我每次publish后disconnect报错。

连接和发布消息代码如下:

 public MqttClient connect() {
        MqttClient client = null;
        try {
            String clientId = MqttClient.generateClientId();
            client = new MqttClient(mqttProperties.getBrokerUrl(), clientId , new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttProperties.getUsername());
            options.setPassword(mqttProperties.getPassword().toCharArray());
            options.setConnectionTimeout(mqttProperties.getTimeout());
            options.setKeepAliveInterval(mqttProperties.getKeepAlive());
            options.setAutomaticReconnect(mqttProperties.getReconnect());
            options.setCleanSession(true);
            try {
                // 设置回调
                client.setCallback(mqttSendCallBack);
                client.connect(options);
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
        return client;
    }

    /**
     * 发布消息
     * 主题格式: server:report:$orgCode(参数实际使用机构代码)
     *
     * @param retained    是否保留
     * @param topic     topic
     * @param pushMessage 消息体
     */
    public void publish(boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(mqttProperties.getQos());
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttDeliveryToken token;
        MqttClient mqttClient = connect();
        try {
            mqttClient.publish(topic, message);
        } catch (MqttException e) {
            log.error(e.getMessage(), e);
        } finally {
            disconnect(mqttClient);
            close(mqttClient);
        }
    }

错误如下:

2022-12-24 10:47:05 INFO  [ MQTT Call: paho2316157135488795] com.qishuo.bmc.message.mqtt.MqttSendCallBack (MqttSendCallBack.java:72) : --------------------ClientId:tcp://127.0.0.1:1883客户端连接成功!--------------------
2022-12-24 10:47:05 INFO  [ MQTT Call: paho2316157135488795] com.qishuo.bmc.message.mqtt.MqttSendCallBack (MqttSendCallBack.java:50) : 向主题:bmc/WX-00QS-01-865-00-012-00256-000010/card/status发送消息成功!
2022-12-24 10:47:05 INFO  [ MQTT Call: paho2316157135488795] com.qishuo.bmc.message.mqtt.MqttSendCallBack (MqttSendCallBack.java:25) : 连接断开
2022-12-24 10:47:05 ERROR [ pool-2-thread-1] com.qishuo.bmc.message.mqtt.MqttSendClient (MqttSendClient.java:93) : 客户机正在断开连接
org.eclipse.paho.client.mqttv3.MqttException: 客户机正在断开连接
        at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31)
        at org.eclipse.paho.client.mqttv3.internal.ClientComms.disconnect(ClientComms.java:511)
        at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:818)
        at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:779)
        at org.eclipse.paho.client.mqttv3.MqttAsyncClient.disconnect(MqttAsyncClient.java:788)
        at org.eclipse.paho.client.mqttv3.MqttClient.disconnect(MqttClient.java:347)
        at com.qishuo.bmc.message.mqtt.MqttSendClient.disconnect(MqttSendClient.java:91)
        at com.qishuo.bmc.message.mqtt.MqttSendClient.publish(MqttSendClient.java:67)
        at com.qishuo.bmc.message.service.MqttServiceImpl.send(MqttServiceImpl.java:38)
        at com.qishuo.bmc.Job.message.MessageSendJob.run(MessageSendJob.java:25)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

环境

  • EMQX 版本:5.0.9
  • 操作系统版本:ubuntu 16.04

重现此问题的步骤

  1. 调用mqttClient发布消息
  2. mqttClient disconnect close
  3. xxx

预期行为

实际行为


功能请求

描述你需要的功能

为什么你需要这个功能


其他

MQTT 是长连接,不建议每发一条消息就重新连接一次。然后 mqttClient.publish(topic, message) 是同步的还是异步的?如果是异步的,那你在发布 QoS 1 和 QoS 2 消息时,这个函数返回并不代表发布流程结束哦。