日志追踪里emqx_connection_terminated, reason: {shutdown,tcp_closed}

环境

  • EMQX 版本:5.4.0
  • 操作系统版本:debian9

问题描述
根据日志追踪发现mqtt客户端异常断开。
2024-06-14T01:27:17.330472+00:00 [SOCKET] ak@192.168.86.48:35878 msg: emqx_connection_terminated, reason: {shutdown,tcp_closed}
2024-06-14T01:27:33.641981+00:00 [MQTT] ak@192.168.86.48:37918 msg: mqtt_packet_received, packet: CONNECT(Q0, R0, D0, ClientId=ak, ProtoName=MQIsdp, ProtoVsn=3, CleanStart=true, KeepAlive=20, Username=admin, Password=******)
2024-06-14T01:27:33.642672+00:00 [MQTT] ak@192.168.86.48:37918 msg: mqtt_packet_sent, packet: CONNACK(Q0, R0, D0, AckFlags=0, ReasonCode=0)
2024-06-14T01:27:33.644767+00:00 [MQTT] ak@192.168.86.48:37918 msg: mqtt_packet_received, packet: SUBSCRIBE(Q1, R0, D0, PacketId=34 TopicFilters=[/mqtt/msg/receive/v1/c95f1d0e-7a1d-41d1-a2a5-69e5dc0b6875(#{nl => 0,qos => 1,
rap => 0,rh => 0})])
2024-06-14T01:27:33.645160+00:00 [AUTHZ] ak@192.168.86.48:37918 msg: authorization_module_nomatch, ipaddr: {192,168,86,48}, module: emqx_authz_client_info, pub_sub: [action_type: subscribe, qos: 1], topic: /mqtt/msg/receive/v1/c95f1d0e-7a1d-41d1-a2a5-69e5dc0b6875, username: admin
2024-06-14T01:27:33.645349+00:00 [SUBSCRIBE] ak@192.168.86.48:37918 msg: subscribe, sub_id: ak, sub_opts: [nl: 0, qos: 1, rap: 0, rh: 0, sub_props: ], topic: /mqtt/msg/receive/v1/c95f1d0e-7a1d-41d1-a2a5-69e5dc0b6875
2024-06-14T01:27:33.645806+00:00 [MQTT] ak@192.168.86.48:37918 msg: mqtt_packet_sent, packet: SUBACK(Q0, R0, D0, PacketId=34, ReasonCodes=[1])

这种 tcp_closed 一般比较难查。EMQX 这边是发现 Socket 异常关闭了,才会打印这个日志,并关闭这个客户端连接。可能是由于:

  1. 客户端进程异常关闭了 Socket
  2. 中间的网络设备或者程序,异常关闭了 Socket

建议是通过 tcpdump 或者 Wireshark 抓包,检查下 TCP 的 FIN 是谁先发起的。

大概有10个mqtt客户端,其余9个都没有出现断开重连,就唯独这个客户端连接出现了重连,而且这个客户端也没有什么高并发的消息。
public class AkCommMqttClient {

private MqttClient mqttClient;

private MqttTopic topicPush;

private MqttProperties mqttInfo;


/**
 * 发布消息
 * @param msg
 * @return
 */
public boolean publish(UpDownMessage msg) {
    try {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(1);
        mqttMessage.setRetained(false);

        String message = JSON.toJSONString(msg);
        byte[] payload = message.getBytes(StandardCharsets.UTF_8);
        mqttMessage.setPayload(payload);
        //
        topicPush = mqttClient.getTopic(mqttInfo.getPushTopic() + "/" + msg.getDeviceId());
        publish(topicPush, mqttMessage);

        log.debug("push type:{}, topic:{}, message:{}" , MessageTypeEnum.getEnum(msg.getMessageType()).getDesc(), topicPush.getName(), message);
        return true;
    } catch (Exception e) {
        log.error("publish msg error", e);
        return false;
    }

}

public void connect() {
    MqttConnectOptions options = new MqttConnectOptions();
    // 设置是否清空session,false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
    options.setCleanSession(true);
    options.setUserName(mqttInfo.getUsername());
    options.setPassword(mqttInfo.getPassword().toCharArray());
    // 设置超时时间,默认30秒
    options.setConnectionTimeout(30);
    // 自动重连
    options.setAutomaticReconnect(true);
    options.setKeepAliveInterval(20);
    try {
        mqttClient.setCallback(new MqttCallbackExtended() {

            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                try {
                    String[] serverTopic = mqttInfo.getReceiveTopic().split(",");
                    int[] Qos = new int[serverTopic.length];
                    for (int i = 0; i < serverTopic.length; i++) {
                        Qos[i] = 1;
                    }
                    log.info("subscribe topic is {}", serverTopic);
                    mqttClient.subscribe(serverTopic, Qos);
                    log.info("establish communication {} mqtt connection success!", mqttInfo.getClientId());
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }
            }

            @Override
            public void connectionLost(Throwable cause) {
                log.error(cause.getMessage(), cause);
                log.debug("connect lost:{}", cause.toString());
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                log.debug("数据发送完成:{}", token.isComplete());
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                log.debug("qos:{}", message.getQos());
                String contentJson = new String(message.getPayload());
                dealIotDownMessage(AkCommMqttClient.this, topic, contentJson);
            }

        });

        mqttClient.connect(options);
        /*String[] serverTopic = mqttInfo.getReceiveTopic().split(",");
        int[] Qos = new int[serverTopic.length];
        for (int i = 0; i < serverTopic.length; i++) {
            Qos[i] = 1;
        }
        log.info("subscribe topic is {}", serverTopic);
        mqttClient.subscribe(serverTopic, Qos);
        log.info("establish communication {} mqtt connection success!", mqttInfo.getClientId());*/
    } catch (Exception e) {
        log.error(e.getMessage(), e);
    }
}

private void publish(MqttTopic topic, MqttMessage message) throws MqttException {
    MqttDeliveryToken token = topic.publish(message);
    token.waitForCompletion();
    log.debug("message is published completely! " + token.isComplete());
}

}

这个显示的是凌晨1点27分断开的,但是我这边9点27分下发消息到这个客户端订阅的主题又报错,看样子还是断线重连了,目前是每天9点多下发消息到这个客户端订阅的主题就会报错提示重连。
09:27:16.774 [MQTT Ping: ak] ERROR o.e.p.c.m.i.ClientState - [logToJsr47,210] - ak: Timed out as no activity, keepAlive=20,000,000,000 lastOutboundActivity=1,409,336,723,652,303 lastInboundActivity=1,409,316,723,292,240 time=1,409,356,723,549,038 lastPing=1,409,336,723,654,784
09:27:16.780 [MQTT Call: ak] ERROR c.h.t.m.a.c.AkCommMqttClient - [publish,48] - publish msg error
org.eclipse.paho.client.mqttv3.MqttException: Timed out waiting for a response from the server
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31)
at org.eclipse.paho.client.mqttv3.internal.ClientState.checkForActivity(ClientState.java:741)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.checkForActivity(ClientComms.java:816)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.checkForActivity(ClientComms.java:802)
at org.eclipse.paho.client.mqttv3.TimerPingSender$PingTask.run(TimerPingSender.java:79)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
09:27:16.780 [MQTT Ping: ak] ERROR c.h.t.m.a.c.AkCommMqttClient - [connectionLost,86] - Timed out waiting for a response from the server
org.eclipse.paho.client.mqttv3.MqttException: Timed out waiting for a response from the server
at org.eclipse.paho.client.mqttv3.internal.ExceptionHelper.createMqttException(ExceptionHelper.java:31)
at org.eclipse.paho.client.mqttv3.internal.ClientState.checkForActivity(ClientState.java:741)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.checkForActivity(ClientComms.java:816)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.checkForActivity(ClientComms.java:802)
at org.eclipse.paho.client.mqttv3.TimerPingSender$PingTask.run(TimerPingSender.java:79)
at java.util.TimerThread.mainLoop(Timer.java:555)
at java.util.TimerThread.run(Timer.java:505)
09:27:16.780 [MQTT Ping: ak] DEBUG c.h.t.m.a.c.AkCommMqttClient - [connectionLost,87] - connect lost:Timed out waiting for a response from the server (32000)
09:27:33.089 [MQTT Call: ak] INFO c.h.t.m.a.c.AkCommMqttClient - [connectComplete,76] - subscribe topic is /mqtt/msg/receive/v1/c95f1d0e-7a1d-41d1-a2a5-69e5dc0b6875
09:27:33.092 [MQTT Call: ak] INFO c.h.t.m.a.c.AkCommMqttClient - [connectComplete,78] - establish communication ak mqtt connection success!