连接本地的emqx服务,没问题。不管用localhost还是172的内网ip,都没问题
但是连接服务器上的emqx,就会频繁丢失连接,断开时的错误码是:MQTT_ERR_CONN_LOST:7
客户端代码
import os
import sys
sys.path.insert(0, os.path.abspath(
os.path.join(
os.path.dirname(
os.path.abspath(__file__)),
'..', '..', 'libs')))
import json
import threading
import paho.mqtt.client as mqtt
from tk_service.command_handler import command_handler_mapping
from tk_service.tk_utils.log import logger
from tk_service.tk_utils.des_utils import des_encrypt, des_decrypt, config
class MqttClient:
_instance = None
_lock = threading.Lock()
def __new__(cls, *args, **kwargs):
if not cls._instance:
with cls._lock:
if not cls._instance:
cls._instance = super(MqttClient, cls).__new__(cls)
return cls._instance
def __init__(self):
if hasattr(self, '_initialized') and self._initialized:
return
self.connected = False
self.subscribed_topics = set()
self.lock = threading.Lock()
self.SUBSCRIBE_DEVICE_ID = f"{config('SUBSCRIBE_PREFIX')}/{config('DEFAULT_TENANT_ID')}/{config('DEFAULT_DEVICE_ID')}"
self.SUBSCRIBE_TENANT_ID = f"{config('SUBSCRIBE_PREFIX')}/{config('DEFAULT_TENANT_ID')}/all"
def command_handler(msg):
command = msg['command']
payload = msg['payload']
if command in command_handler_mapping:
command_handler_mapping[command](payload)
else:
logger.warning(f"Command {command} not found")
def on_message(client, userdata, msg):
"""
控制服务,消息处理函数
"""
payload = msg.payload
try:
payload = json.loads(des_decrypt(payload))
if payload["command"]:
command_handler(payload)
except Exception as msgParseException:
logger.error("消息解析失败:%s,错误消息体:%s", msgParseException, payload)
def on_connect(client, userdata, flags, rc):
if rc == 0:
logger.info("连接成功")
with self.lock:
self.connected = True
client.subscribe(self.SUBSCRIBE_DEVICE_ID)
logger.info("订阅设备id通配符 %s", self.SUBSCRIBE_DEVICE_ID)
client.subscribe(self.SUBSCRIBE_TENANT_ID)
logger.info("订阅租户id通配符 %s", self.SUBSCRIBE_TENANT_ID)
else:
logger.error("连接失败,返回代码: %s", rc)
client.reconnect()
def on_disconnect(client, userdata, rc):
if rc != 0:
logger.warning(f"意外断开连接:{rc}")
with self.lock:
self.connected = False
retry_connect(client)
def retry_connect(client):
logger.info("尝试重新连接...")
try:
client.reconnect()
except Exception as connection_error:
logger.error(f"重试连接 MQTT 代理[{config('MQTT_BROKER')}:{config('MQTT_PORT')}]失败: %s",
connection_error)
threading.Timer(config('MQTT_RETRY_CONNECTION_INTERVAL'), retry_connect, args=(client,)).start()
self.client = mqtt.Client(client_id=config('DEFAULT_DEVICE_ID'))
self.client.username_pw_set(config('MQTT_USERNAME'),config('MQTT_PASSWORD'))
self.client.on_connect = on_connect
self.client.on_message = on_message
self.client.on_disconnect = on_disconnect
try:
# 日志打印mqtt的ip和端口
logger.info(f"MQTT_SERVER:{config('MQTT_BROKER')}:{config('MQTT_PORT')}")
self.client.connect(config('MQTT_BROKER'), config('MQTT_PORT'))
except Exception as e:
logger.error("连接 MQTT 代理失败: %s", e)
retry_connect(self.client)
self.client.loop_start()
self._initialized = True
def __subscribe_topic(self, topic):
self.subscribed_topics.add(topic)
self.client.subscribe(topic)
logger.info("订阅设备id通配符 %s", topic)
def refresh_subscribe_topic(self):
logger.info("刷新订阅设备id通配符")
for subscribed_topic in self.subscribed_topics:
self.client.unsubscribe(subscribed_topic)
self.subscribed_topics.clear()
self.__subscribe_topic(
f"{config('SUBSCRIBE_PREFIX')}/{config('DEFAULT_TENANT_ID')}/{config('DEFAULT_DEVICE_ID')}")
self.__subscribe_topic(f"{config('SUBSCRIBE_PREFIX')}/{config('DEFAULT_TENANT_ID')}/all")
def publish(self, message: str, qos=0):
with self.lock:
if not self.connected:
logger.warning("MQTT 客户端未连接,无法发布消息")
return
message = des_encrypt(message)
publish_topic = f"{config('PUBLISH_PREFIX')}/{config('DEFAULT_TENANT_ID')}/{config('DEFAULT_DEVICE_ID')}"
result = self.client.publish(publish_topic,
message, qos)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info("消息已发布到 %s: %s", publish_topic, message)
else:
logger.error("发布消息到 %s 失败: %s", publish_topic, result.rc)
def close(self):
logger.info("正在关闭 MQTT 客户端...")
self.client.loop_stop() # 停止网络循环
self.client.disconnect() # 断开连接
logger.info("MQTT 客户端已成功关闭")
@classmethod
def get_instance(cls):
instance = cls()
return instance
# 获取单例实例的方法
def get_mqtt_client():
return MqttClient.get_instance()
这是连接远程服务器emqx服务的日志
2024-07-27 13:23:51,828 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a48467
2024-07-27 13:23:52,686 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:23:52,686 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:23:53,824 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:23:53,825 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:23:53,825 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:23:53,922 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a48469
2024-07-27 13:23:54,940 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:23:54,940 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:23:56,001 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:23:56,069 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:23:56,069 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:23:56,069 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:23:57,192 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:23:57,192 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:23:58,072 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:23:58,326 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:23:58,327 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:23:58,327 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:23:59,485 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:23:59,485 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:24:00,142 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:24:00,610 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:24:00,610 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:24:00,611 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:24:01,761 - WARNING - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 意外断开连接:7
2024-07-27 13:24:01,761 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 尝试重新连接...
2024-07-27 13:24:02,230 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:24:02,902 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:24:02,902 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:24:02,902 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:24:03,105 - INFO - MainThread - Stopping all tasks...
2024-07-27 13:24:03,105 - INFO - MainThread - All tasks stopped.
2024-07-27 13:24:03,105 - INFO - MainThread - Closing MQTT client...
2024-07-27 13:24:03,105 - INFO - MainThread - 正在关闭 MQTT 客户端...
2024-07-27 13:24:03,985 - INFO - MainThread - MQTT 客户端已成功关闭
2024-07-27 13:24:03,985 - INFO - MainThread - MQTT client closed.
2024-07-27 13:24:03,986 - INFO - MainThread - Exiting...
2024-07-27 13:24:03,986 - INFO - MainThread - Exited.
这是连接本地服务是的日志,没有断开
2024-07-27 13:25:06,961 - INFO - MainThread - Starting service...
2024-07-27 13:25:06,961 - INFO - MainThread - Initializing MQTT client...
2024-07-27 13:25:06,961 - INFO - MainThread - Initializing configuration
2024-07-27 13:25:06,961 - INFO - MainThread - Configuration file loaded successfully
2024-07-27 13:25:06,961 - INFO - MainThread - MQTT_SERVER:localhost:1883
2024-07-27 13:25:06,965 - INFO - MainThread - MQTT client initialized.
2024-07-27 13:25:06,965 - INFO - MainThread - Register command handler
2024-07-27 13:25:06,968 - INFO - MainThread - Register command handler: refresh_command_handler
2024-07-27 13:25:06,968 - INFO - MainThread - Register command handler: reset_task_execute_interval
2024-07-27 13:25:06,969 - INFO - MainThread - Register command handler: set_tenant_id
2024-07-27 13:25:06,969 - INFO - MainThread - Register command handler: update_program
2024-07-27 13:25:06,969 - INFO - MainThread - Register command handler success
2024-07-27 13:25:06,969 - INFO - MainThread - Initializing timing executor...
2024-07-27 13:25:06,969 - INFO - MainThread - Timing executor initialized.
2024-07-27 13:25:06,973 - INFO - MainThread - 加载在线任务:push_status_data_task
2024-07-27 13:25:06,973 - WARNING - MainThread - MQTT 客户端未连接,无法发布消息
2024-07-27 13:25:06,973 - INFO - MainThread - 注册任务:test_task
2024-07-27 13:25:06,973 - WARNING - thread-test_task - MQTT 客户端未连接,无法发布消息
2024-07-27 13:25:06,973 - INFO - MainThread - Service started.
2024-07-27 13:25:06,975 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 连接成功
2024-07-27 13:25:06,975 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅设备id通配符 cmd/aabbcc/a26012ce9124457f9e6dd3075f332f71
2024-07-27 13:25:06,975 - INFO - paho-mqtt-client-a26012ce9124457f9e6dd3075f332f71 - 订阅租户id通配符 cmd/aabbcc/all
2024-07-27 13:25:09,053 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484b5
2024-07-27 13:25:11,129 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484b7
2024-07-27 13:25:13,205 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484b9
2024-07-27 13:25:15,279 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484bb
2024-07-27 13:25:17,356 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484bd
2024-07-27 13:25:19,436 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484bf
2024-07-27 13:25:21,510 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c1
2024-07-27 13:25:23,580 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c3
2024-07-27 13:25:25,655 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c5
2024-07-27 13:25:27,732 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c7
2024-07-27 13:25:29,810 - INFO - thread-test_task - 消息已发布到 status/aabbcc/a26012ce9124457f9e6dd3075f332f71: 000000000000000000000000000000000000000066a484c9
- MQTT 协议规范中没有
ReasonCode=7
的原因码。
这个错误可能是 paho 库中的内部错误码。 - 请提供问题发生时 EMQX 的日志信息。
- 猜测的可能原因为网络问题,可以尝试用 wireshark/tcpdump 等工具在 client 端或 server 端抓包观察行为
- 也可以用 EMQX 的 log trace 功能对 client 进行 trace 观察 client 和 EMQX 的交互行为