客户端频繁丢失连接

连接本地的emqx服务,没问题。不管用localhost还是172的内网ip,都没问题
但是连接服务器上的emqx,就会频繁丢失连接,断开时的错误码是:MQTT_ERR_CONN_LOST:7

客户端代码

import os
import sys

sys.path.insert(0, os.path.abspath(
    os.path.join(
        os.path.dirname(
            os.path.abspath(__file__)),
        '..', '..', 'libs')))

import json
import threading
import paho.mqtt.client as mqtt

from tk_service.command_handler import command_handler_mapping
from tk_service.tk_utils.log import logger
from tk_service.tk_utils.des_utils import des_encrypt, des_decrypt, config


class MqttClient:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super(MqttClient, cls).__new__(cls)
        return cls._instance

    def __init__(self):
        if hasattr(self, '_initialized') and self._initialized:
            return

        self.connected = False
        self.subscribed_topics = set()
        self.lock = threading.Lock()
        self.SUBSCRIBE_DEVICE_ID = f"{config('SUBSCRIBE_PREFIX')}/{config('DEFAULT_TENANT_ID')}/{config('DEFAULT_DEVICE_ID')}"
        self.SUBSCRIBE_TENANT_ID = f"{config('SUBSCRIBE_PREFIX')}/{config('DEFAULT_TENANT_ID')}/all"

        def command_handler(msg):
            command = msg['command']
            payload = msg['payload']
            if command in command_handler_mapping:
                command_handler_mapping[command](payload)
            else:
                logger.warning(f"Command {command} not found")

        def on_message(client, userdata, msg):
            """
                控制服务,消息处理函数
            """
            payload = msg.payload
            try:
                payload = json.loads(des_decrypt(payload))
                if payload["command"]:
                    command_handler(payload)
            except Exception as msgParseException:
                logger.error("消息解析失败:%s,错误消息体:%s", msgParseException, payload)

        def on_connect(client, userdata, flags, rc):
            if rc == 0:
                logger.info("连接成功")
                with self.lock:
                    self.connected = True
                client.subscribe(self.SUBSCRIBE_DEVICE_ID)
                logger.info("订阅设备id通配符 %s", self.SUBSCRIBE_DEVICE_ID)
                client.subscribe(self.SUBSCRIBE_TENANT_ID)
                logger.info("订阅租户id通配符 %s", self.SUBSCRIBE_TENANT_ID)
            else:
                logger.error("连接失败,返回代码: %s", rc)
                client.reconnect()

        def on_disconnect(client, userdata, rc):
            if rc != 0:
                logger.warning(f"意外断开连接:{rc}")
                with self.lock:
                    self.connected = False
                retry_connect(client)

        def retry_connect(client):
            logger.info("尝试重新连接...")
            try:
                client.reconnect()
            except Exception as connection_error:
                logger.error(f"重试连接 MQTT 代理[{config('MQTT_BROKER')}:{config('MQTT_PORT')}]失败: %s",
                             connection_error)
                threading.Timer(config('MQTT_RETRY_CONNECTION_INTERVAL'), retry_connect, args=(client,)).start()

        self.client = mqtt.Client(client_id=config('DEFAULT_DEVICE_ID'))
        self.client.username_pw_set(config('MQTT_USERNAME'),config('MQTT_PASSWORD'))
        self.client.on_connect = on_connect
        self.client.on_message = on_message
        self.client.on_disconnect = on_disconnect
        try:
            # 日志打印mqtt的ip和端口
            logger.info(f"MQTT_SERVER:{config('MQTT_BROKER')}:{config('MQTT_PORT')}")
            self.client.connect(config('MQTT_BROKER'), config('MQTT_PORT'))
        except Exception as e:
            logger.error("连接 MQTT 代理失败: %s", e)
            retry_connect(self.client)
        self.client.loop_start()
        self._initialized = True

    def __subscribe_topic(self, topic):
        self.subscribed_topics.add(topic)
        self.client.subscribe(topic)
        logger.info("订阅设备id通配符 %s", topic)

    def refresh_subscribe_topic(self):
        logger.info("刷新订阅设备id通配符")
        for subscribed_topic in self.subscribed_topics:
            self.client.unsubscribe(subscribed_topic)
        self.subscribed_topics.clear()
        self.__subscribe_topic(
            f"{config('SUBSCRIBE_PREFIX')}/{config('DEFAULT_TENANT_ID')}/{config('DEFAULT_DEVICE_ID')}")
        self.__subscribe_topic(f"{config('SUBSCRIBE_PREFIX')}/{config('DEFAULT_TENANT_ID')}/all")

    def publish(self, message: str, qos=0):
        with self.lock:
            if not self.connected:
                logger.warning("MQTT 客户端未连接,无法发布消息")
                return
        message = des_encrypt(message)
        publish_topic = f"{config('PUBLISH_PREFIX')}/{config('DEFAULT_TENANT_ID')}/{config('DEFAULT_DEVICE_ID')}"
        result = self.client.publish(publish_topic,
                                     message, qos)
        if result.rc == mqtt.MQTT_ERR_SUCCESS:
            logger.info("消息已发布到 %s: %s", publish_topic, message)
        else:
            logger.error("发布消息到 %s 失败: %s", publish_topic, result.rc)

    def close(self):
        logger.info("正在关闭 MQTT 客户端...")
        self.client.loop_stop()  # 停止网络循环
        self.client.disconnect()  # 断开连接
        logger.info("MQTT 客户端已成功关闭")

    @classmethod
    def get_instance(cls):
        instance = cls()
        return instance


# 获取单例实例的方法
def get_mqtt_client():
    return MqttClient.get_instance()

这是连接远程服务器emqx服务的日志

2024-07-27 13:23:51,828 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a48467
2024-07-27 13:23:52,686 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:23:52,686 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:23:53,824 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:23:53,825 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:23:53,825 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:23:53,922 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a48469
2024-07-27 13:23:54,940 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:23:54,940 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:23:56,001 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:23:56,069 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:23:56,069 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:23:56,069 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:23:57,192 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:23:57,192 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:23:58,072 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:23:58,326 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:23:58,327 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:23:58,327 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:23:59,485 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:23:59,485 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:24:00,142 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:24:00,610 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:24:00,610 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:24:00,611 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:24:01,761 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:24:01,761 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:24:02,230 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:24:02,902 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:24:02,902 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:24:02,902 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:24:03,105 - INFO - MainThread - Stopping all tasks...
2024-07-27 13:24:03,105 - INFO - MainThread - All tasks stopped.
2024-07-27 13:24:03,105 - INFO - MainThread - Closing MQTT client...
2024-07-27 13:24:03,105 - INFO - MainThread - 正在关闭 MQTT 客户端...
2024-07-27 13:24:03,985 - INFO - MainThread - MQTT 客户端已成功关闭
2024-07-27 13:24:03,985 - INFO - MainThread - MQTT client closed.
2024-07-27 13:24:03,986 - INFO - MainThread - Exiting...
2024-07-27 13:24:03,986 - INFO - MainThread - Exited.

这是连接本地服务是的日志,没有断开

2024-07-27 13:25:06,961 - INFO - MainThread - Starting service...
2024-07-27 13:25:06,961 - INFO - MainThread - Initializing MQTT client...
2024-07-27 13:25:06,961 - INFO - MainThread - Initializing configuration
2024-07-27 13:25:06,961 - INFO - MainThread - Configuration file loaded successfully
2024-07-27 13:25:06,961 - INFO - MainThread - MQTT_SERVER:localhost:1883
2024-07-27 13:25:06,965 - INFO - MainThread - MQTT client initialized.
2024-07-27 13:25:06,965 - INFO - MainThread - Register command handler
2024-07-27 13:25:06,968 - INFO - MainThread - Register command handler: refresh_command_handler
2024-07-27 13:25:06,968 - INFO - MainThread - Register command handler: reset_task_execute_interval
2024-07-27 13:25:06,969 - INFO - MainThread - Register command handler: set_tenant_id
2024-07-27 13:25:06,969 - INFO - MainThread - Register command handler: update_program
2024-07-27 13:25:06,969 - INFO - MainThread - Register command handler success
2024-07-27 13:25:06,969 - INFO - MainThread - Initializing timing executor...
2024-07-27 13:25:06,969 - INFO - MainThread - Timing executor initialized.
2024-07-27 13:25:06,973 - INFO - MainThread - 加载在线任务:push_status_data_task
2024-07-27 13:25:06,973 - WARNING - MainThread - MQTT 客户端未连接,无法发布消息
2024-07-27 13:25:06,973 - INFO - MainThread - 注册任务:test_task
2024-07-27 13:25:06,973 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:25:06,973 - INFO - MainThread - Service started.
2024-07-27 13:25:06,975 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:25:06,975 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:25:06,975 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:25:09,053 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484b5
2024-07-27 13:25:11,129 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484b7
2024-07-27 13:25:13,205 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484b9
2024-07-27 13:25:15,279 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484bb
2024-07-27 13:25:17,356 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484bd
2024-07-27 13:25:19,436 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484bf
2024-07-27 13:25:21,510 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c1
2024-07-27 13:25:23,580 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c3
2024-07-27 13:25:25,655 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c5
2024-07-27 13:25:27,732 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c7
2024-07-27 13:25:29,810 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c9

  • MQTT 协议规范中没有 ReasonCode=7 的原因码。
    这个错误可能是 paho 库中的内部错误码。
  • 请提供问题发生时 EMQX 的日志信息。
  • 猜测的可能原因为网络问题,可以尝试用 wireshark/tcpdump 等工具在 client 端或 server 端抓包观察行为
  • 也可以用 EMQX 的 log trace 功能对 client 进行 trace 观察 client 和 EMQX 的交互行为