emqx 结合springboot,有时候偶尔收不到订阅的消息
针对 EMQX 结合 Spring Boot 偶尔收不到订阅消息的问题,以下是分步排查和解决方案:
1. 检查客户端连接稳定性
- 现象:网络波动或防火墙拦截导致连接断开。
- 解决方案:
- 重连机制:在 Spring Boot 客户端中实现
MqttConnectOptions
的自动重连:MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(true); // 开启自动重连 options.setConnectionTimeout(10); // 超时时间(秒)
- 心跳检测:合理设置心跳间隔(
Keep Alive
):options.setKeepAliveInterval(60); // 单位:秒
- 重连机制:在 Spring Boot 客户端中实现
2. 确认 QoS 级别设置
- 现象:低 QoS 导致消息在传输层丢失。
- 解决方案:
- 发布端和订阅端使用 QoS 1 或 2(至少一次/精确一次):
// 订阅时指定 QoS client.subscribe("topic/example", 1); // 发布时指定 QoS MqttMessage message = new MqttMessage(); message.setQos(1); client.publish("topic/example", message);
- 发布端和订阅端使用 QoS 1 或 2(至少一次/精确一次):
3. 确保订阅时机正确
- 现象:订阅发生在连接完成前,导致消息丢失。
- 解决方案:在连接成功回调中执行订阅操作:
client.setCallback(new MqttCallback() { @Override public void connectComplete(boolean reconnect, String serverURI) { if (!reconnect) { client.subscribe("topic/example", 1); } } });
4. 检查消息发布与订阅的时序
- 现象:消息发布时订阅者尚未在线。
- 解决方案:
- 使用 EMQX 的保留消息(Retained Message)功能,确保新订阅者能立即获取最新消息:
MqttMessage message = new MqttMessage(); message.setRetained(true); // 设置为保留消息 client.publish("topic/example", message);
- 使用 EMQX 的保留消息(Retained Message)功能,确保新订阅者能立即获取最新消息:
5. 排查多实例负载均衡问题
- 现象:多个 Spring Boot 实例导致订阅分散。
- 解决方案:
- 共享订阅:使用 EMQX 的共享订阅功能(
$share/group/topic
),将消息均匀分发到多个消费者:client.subscribe("$share/group1/topic/example", 1);
- 唯一 Client ID:确保每个实例的
clientId
唯一,避免冲突:String clientId = "client-" + UUID.randomUUID().toString(); client.connect(new MqttConnectOptions(), clientId);
- 共享订阅:使用 EMQX 的共享订阅功能(
6. 检查 EMQX 服务端限制
- 现象:服务端配置限制导致消息被丢弃。
- 解决方案:
- 检查 EMQX 的
emqx.conf
配置文件,调整以下参数:zone.external.max_packet_size = 10MB # 允许的最大消息大小 zone.external.rate_limit.conn_messages_in = "1000/s" # 连接消息接收速率
- 监控 EMQX 资源使用情况(通过 Dashboard 或命令行):
emqx_ctl status # 查看节点状态 emqx_ctl metrics monitor # 实时监控指标
- 检查 EMQX 的
7. 客户端日志与调试
- 现象:客户端代码存在异常或消息未正确处理。
- 解决方案:
- 开启 DEBUG 日志:在
application.properties
中配置:logging.level.org.eclipse.paho.client.mqttv3=DEBUG
- 消息到达回调检查:确保
messageArrived
方法无阻塞:@Override public void messageArrived(String topic, MqttMessage message) { try { // 处理消息(避免长时间阻塞) } catch (Exception e) { logger.error("消息处理异常", e); } }
- 开启 DEBUG 日志:在
8. 网络与 EMQX 集群健康检查
- 现象:网络延迟或 EMQX 集群节点故障。
- 解决方案:
- 网络诊断:使用
ping
和traceroute
检查客户端到 EMQX 的网络延迟。 - 集群状态检查:
emqx_ctl cluster status # 查看集群节点状态
- 网络诊断:使用
9. 持久化会话配置
- 现象:客户端离线时消息丢失。
- 解决方案:启用持久会话(Clean Session = false),并设置会话过期时间:
MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); // 禁用 Clean Session // EMQX 默认会话过期时间为 2 小时,可通过 emqx.conf 调整
总结步骤
- 开启客户端和服务端日志,定位消息丢失的具体环节。
- 逐步验证 QoS、订阅时机、连接稳定性 等关键配置。
- 通过 EMQX Dashboard 监控消息流(发布/订阅的速率、客户端状态)。
- 压力测试:使用工具(如 JMeter)模拟高并发场景,验证系统瓶颈。
通过上述步骤,通常可以解决 90% 的偶发消息丢失问题。如果问题仍存在,建议提供客户端和服务端的 DEBUG 日志片段,以便进一步分析。
这是我录制的视频:https://59.110.12.7:34829/down/4EAS3lLiwa44.mp4,
我的配置:
EmqClient.zip (1.8 KB)
,订阅的是device/#