【求助】重复注册clientId导致消息全部发送失败 EMQX 4.3

环境

  • EMQX 版本:4.3

  • 操作系统版本:centos7.9

  • 开发语言:Java

  • mqtt maven依赖:

    org.springframework.integration
    spring-integration-mqtt
    5.2.0.RELEASE


    org.eclipse.paho
    org.eclipse.paho.client.mqttv3
    1.2.1

  • mqtt server端连接配置:
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
    import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.stereotype.Component;

/**

  • @author LIJIYANG001

  • @date 2019/12/30 18:56

  • mqtt sender config
    */
    @ Component
    public class PahoSenderConfig {
    @ Value(“${mqtt.host}”)
    private String hostAndPort;

    @ Value(“${mqtt.username}”)
    private String username;

    @ Value(“${mqtt.password}”)
    private String password;

    @ Bean
    public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String { “tcp://” + hostAndPort});
    options.setUserName(username);
    options.setPassword(password.toCharArray());
    options.setAutomaticReconnect(true);
    options.setMaxInflight(500);
    factory.setConnectionOptions(options);
    return factory;
    }

    @ Bean
    @ ServiceActivator(inputChannel = “mqttOutboundChannel”)
    public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(“mqttController_” + System.currentTimeMillis(), mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(“TEST”);
    return messageHandler;
    }

    @ Bean
    public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
    }

}

重现此问题的步骤

  1. 重启应用
  2. 上游业务流量开始涌入(qps:100~200),emq dashboard监控里开始看到server端、client端出现大量重复的clientId(ip也一样,端口号不一样),且重复数量一直在递增
  3. 初步怀疑是流量上来后,导致server端、client端都在重复创建和mqtt的连接
  4. 为了验证是否是流量激增导致,我们后来在server连接mqtt前,做了限流,且慢慢切流至全量,就恢复了

预期行为

预期不会出现重复的clientId。因为日常都不会出现重复的clientId,一旦出现重复的clientId就会导致消息发送失败

实际行为

mqtt下发指令消息发送失败

异常信息:
json.ext.exception:org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [mqttOutbound]; nested exception is Connect already in progress (32110), failedMessage=GenericMessage [payload={“roomno”:“1_3_3099_0”,“controldevice”:[{“cmddetails”:{“switch”:“1”},“sourcetype”:“20”,“deviceid”:“SCENE-ZONGGUANMOSHI-01830300”,“devicetype”:“Scene”}],“requestId”:“1354ab043f2394028967c2a7aa547183b”,“providerid”:“1”,“msgid”:“11_9000057_724683807854678016”,“hotelid”:“9000057”,“timestamp”:1716681584}, headers={mqtt_qos=0, id=a02517fa-b771-66c8-6ad4-c7bedd14e88f, mqtt_topic=SMARTHOTEL/COMMAND/9000057/RCU/DEVICECONTROL, timestamp=1716681584771}]

这个有两种可能:

  1. 客户端的重连代码没写对。上一个客户端还没断开。又开始重连了。当然第二个连上来的客户端emqx会把第一个客户端挤下线,但是还是有时间差的,会存在客户端 id 同时在线的情况。
  2. 客户端重连太快了(重连的请求太多了)。导致 emqx 踢下线的进程卡住了。我记得 4.3 的某个版本会有这个 bug,后面在 4.4 的最新版本是修复了的。具体是不是这个原因:得看 emqx 的完全日志才能判断。

首先什么场景会触发server端和emq broker一直在重连?而且重复clientId是在server端重启的场景下必现!!!

可能1:在高qps场景下,会导致一个链接未断开又重发了另一个链接的请求,是吗?但是这些本该断开却未来得及断开的链接一直存在的;这个怎么解释呢?
可能2:v4 版本 | EMQX 文档 这里面能标识出来是哪个bug fix点吗?

还有就是为什么出现了重复的clientId,却导致了消息下发全部失败;

重连的情况有很多。得看相关的日志,来分析出是什么问题。没有日志,猜对的概率很小。
我说的可能 1,2 都是 bug来的,我也解释不了,我只是想说,以前有客户遇到过类似的问题,我们优化了很多个版本。关于踢下线的优化,大约可以从这个历史里面找到相关的。History for src/emqx_cm.erl - emqx/emqx · GitHub
因为优化的时间跨度和改动都很大,changelog 里面并没有包括到这个内容,非常抱歉。