springboot mqtt 接收不到信息怎么排查

emqx 结合springboot,有时候偶尔收不到订阅的消息

针对 EMQX 结合 Spring Boot 偶尔收不到订阅消息的问题,以下是分步排查和解决方案:


1. 检查客户端连接稳定性

  • 现象:网络波动或防火墙拦截导致连接断开。
  • 解决方案
    • 重连机制:在 Spring Boot 客户端中实现 MqttConnectOptions 的自动重连:
      MqttConnectOptions options = new MqttConnectOptions();
      options.setAutomaticReconnect(true); // 开启自动重连
      options.setConnectionTimeout(10);    // 超时时间(秒)
      
    • 心跳检测:合理设置心跳间隔(Keep Alive):
      options.setKeepAliveInterval(60); // 单位:秒
      

2. 确认 QoS 级别设置

  • 现象:低 QoS 导致消息在传输层丢失。
  • 解决方案
    • 发布端和订阅端使用 QoS 1 或 2(至少一次/精确一次):
      // 订阅时指定 QoS
      client.subscribe("topic/example", 1);
      
      // 发布时指定 QoS
      MqttMessage message = new MqttMessage();
      message.setQos(1);
      client.publish("topic/example", message);
      

3. 确保订阅时机正确

  • 现象:订阅发生在连接完成前,导致消息丢失。
  • 解决方案:在连接成功回调中执行订阅操作:
    client.setCallback(new MqttCallback() {
        @Override
        public void connectComplete(boolean reconnect, String serverURI) {
            if (!reconnect) {
                client.subscribe("topic/example", 1);
            }
        }
    });
    

4. 检查消息发布与订阅的时序

  • 现象:消息发布时订阅者尚未在线。
  • 解决方案
    • 使用 EMQX 的保留消息(Retained Message)功能,确保新订阅者能立即获取最新消息:
      MqttMessage message = new MqttMessage();
      message.setRetained(true); // 设置为保留消息
      client.publish("topic/example", message);
      

5. 排查多实例负载均衡问题

  • 现象:多个 Spring Boot 实例导致订阅分散。
  • 解决方案
    • 共享订阅:使用 EMQX 的共享订阅功能($share/group/topic),将消息均匀分发到多个消费者:
      client.subscribe("$share/group1/topic/example", 1);
      
    • 唯一 Client ID:确保每个实例的 clientId 唯一,避免冲突:
      String clientId = "client-" + UUID.randomUUID().toString();
      client.connect(new MqttConnectOptions(), clientId);
      

6. 检查 EMQX 服务端限制

  • 现象:服务端配置限制导致消息被丢弃。
  • 解决方案
    • 检查 EMQX 的 emqx.conf 配置文件,调整以下参数:
      zone.external.max_packet_size = 10MB       # 允许的最大消息大小
      zone.external.rate_limit.conn_messages_in = "1000/s"  # 连接消息接收速率
      
    • 监控 EMQX 资源使用情况(通过 Dashboard 或命令行):
      emqx_ctl status          # 查看节点状态
      emqx_ctl metrics monitor # 实时监控指标
      

7. 客户端日志与调试

  • 现象:客户端代码存在异常或消息未正确处理。
  • 解决方案
    • 开启 DEBUG 日志:在 application.properties 中配置:
      logging.level.org.eclipse.paho.client.mqttv3=DEBUG
      
    • 消息到达回调检查:确保 messageArrived 方法无阻塞:
      @Override
      public void messageArrived(String topic, MqttMessage message) {
          try {
              // 处理消息(避免长时间阻塞)
          } catch (Exception e) {
              logger.error("消息处理异常", e);
          }
      }
      

8. 网络与 EMQX 集群健康检查

  • 现象:网络延迟或 EMQX 集群节点故障。
  • 解决方案
    • 网络诊断:使用 pingtraceroute 检查客户端到 EMQX 的网络延迟。
    • 集群状态检查
      emqx_ctl cluster status  # 查看集群节点状态
      

9. 持久化会话配置

  • 现象:客户端离线时消息丢失。
  • 解决方案:启用持久会话(Clean Session = false),并设置会话过期时间:
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(false); // 禁用 Clean Session
    // EMQX 默认会话过期时间为 2 小时,可通过 emqx.conf 调整
    

总结步骤

  1. 开启客户端和服务端日志,定位消息丢失的具体环节。
  2. 逐步验证 QoS、订阅时机、连接稳定性 等关键配置。
  3. 通过 EMQX Dashboard 监控消息流(发布/订阅的速率、客户端状态)。
  4. 压力测试:使用工具(如 JMeter)模拟高并发场景,验证系统瓶颈。

通过上述步骤,通常可以解决 90% 的偶发消息丢失问题。如果问题仍存在,建议提供客户端和服务端的 DEBUG 日志片段,以便进一步分析。

这是我录制的视频:https://59.110.12.7:34829/down/4EAS3lLiwa44.mp4

这是我录制的视频:https://59.110.12.7:34829/down/4EAS3lLiwa44.mp4,
我的配置:
EmqClient.zip (1.8 KB)
,订阅的是device/#

抱歉,您发的 java 代码,我看不懂的(我不会 java)
建议你找专业的人看看。
PS: