环境
-
EMQX 版本:4.3
-
操作系统版本:centos7.9
-
开发语言:Java
-
mqtt maven依赖:
org.springframework.integration
spring-integration-mqtt
5.2.0.RELEASE
org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.1
-
mqtt server端连接配置:
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
/**
-
@author LIJIYANG001
-
@date 2019/12/30 18:56
-
mqtt sender config
*/
@ Component
public class PahoSenderConfig {
@ Value(“${mqtt.host}”)
private String hostAndPort;@ Value(“${mqtt.username}”)
private String username;@ Value(“${mqtt.password}”)
private String password;@ Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String { “tcp://” + hostAndPort});
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setAutomaticReconnect(true);
options.setMaxInflight(500);
factory.setConnectionOptions(options);
return factory;
}@ Bean
@ ServiceActivator(inputChannel = “mqttOutboundChannel”)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(“mqttController_” + System.currentTimeMillis(), mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(“TEST”);
return messageHandler;
}@ Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
重现此问题的步骤
- 重启应用
- 上游业务流量开始涌入(qps:100~200),emq dashboard监控里开始看到server端、client端出现大量重复的clientId(ip也一样,端口号不一样),且重复数量一直在递增
- 初步怀疑是流量上来后,导致server端、client端都在重复创建和mqtt的连接
- 为了验证是否是流量激增导致,我们后来在server连接mqtt前,做了限流,且慢慢切流至全量,就恢复了
无
预期行为
预期不会出现重复的clientId。因为日常都不会出现重复的clientId,一旦出现重复的clientId就会导致消息发送失败
实际行为
mqtt下发指令消息发送失败
异常信息:
json.ext.exception:org.springframework.messaging.MessageHandlingException: Failed to publish to MQTT in the [mqttOutbound]; nested exception is Connect already in progress (32110), failedMessage=GenericMessage [payload={“roomno”:“1_3_3099_0”,“controldevice”:[{“cmddetails”:{“switch”:“1”},“sourcetype”:“20”,“deviceid”:“SCENE-ZONGGUANMOSHI-01830300”,“devicetype”:“Scene”}],“requestId”:“1354ab043f2394028967c2a7aa547183b”,“providerid”:“1”,“msgid”:“11_9000057_724683807854678016”,“hotelid”:“9000057”,“timestamp”:1716681584}, headers={mqtt_qos=0, id=a02517fa-b771-66c8-6ad4-c7bedd14e88f, mqtt_topic=SMARTHOTEL/COMMAND/9000057/RCU/DEVICECONTROL, timestamp=1716681584771}]
