emqx 循环发送消息报错

EMQX 版本

4.4.2

EMQX 安装部署方式

emqx-4.4.2-otp24.1.5-3-el7-amd64.zip
直接解压启动

EMQX 集群情况

单节点

服务器(运行 EMQX 的机器)硬件配置

16核 32GB

服务器操作系统和平台

centos7.5

服务端参数优化情况

压力机硬件配置

压力机使用的测试工具

压力机参数优化情况

在 EMQX 中启用的功能

测试场景

循环下发数据,循环一次,pubList大小为100
java代码如下
for (String topic : pubList) {
try {
if (!mqttClient.isConnected()) {
MqttConnectOptions option = SpringIOCUtil.getBean(MqttConnectOptions.class);
mqttClient.connect(option);
}
mqttClient.publish(topic, mqttMessage);
Thread.sleep(50);
log.info(“向主题{}发送消息成功”, topic);
} catch (Exception e) {
log.error(PrintUtils.getTrace(e));
log.error(“[publishBatch]方法异常”);
}
}

具体问题

后台报错,报错日志如下
2025-05-21 17:10:12,447 [async_thread_3] ERROR c.i.i.o.s.mqtt.MqttPublishPlugin - [publishBatch]方法异常
2025-05-21 17:10:12,448 [async_thread_3] ERROR c.i.i.o.s.mqtt.MqttPublishPlugin - Too many publishes in progress (32202)
at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:527)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:163)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:193)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1375)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1348)
at com.iflytek.iot.ota.service.mqtt.MqttPublishPlugin.lambda$publishBatch$0(MqttPublishPlugin.java:98)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.iflytek.iot.ota.service.mqtt.MqttPublishPlugin.publishBatch(MqttPublishPlugin.java:92)
at com.iflytek.iot.ota.service.mqtt.MqttPublishPlugin$$FastClassBySpringCGLIB$$d072b58a.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
emqx是不是可以调整什么参数解决这个发送数据问题

没有吧,可以看看客户端有没有这种参数

那这种错误有什么解决办法吗,不能就这样一直报错吧

找个 java 高手一起 debug 一下。