EMQX 版本
4.4.2
EMQX 安装部署方式
emqx-4.4.2-otp24.1.5-3-el7-amd64.zip
直接解压启动
EMQX 集群情况
单节点
服务器(运行 EMQX 的机器)硬件配置
16核 32GB
服务器操作系统和平台
centos7.5
服务端参数优化情况
无
压力机硬件配置
压力机使用的测试工具
压力机参数优化情况
在 EMQX 中启用的功能
测试场景
循环下发数据,循环一次,pubList大小为100
java代码如下
for (String topic : pubList) {
try {
if (!mqttClient.isConnected()) {
MqttConnectOptions option = SpringIOCUtil.getBean(MqttConnectOptions.class);
mqttClient.connect(option);
}
mqttClient.publish(topic, mqttMessage);
Thread.sleep(50);
log.info(“向主题{}发送消息成功”, topic);
} catch (Exception e) {
log.error(PrintUtils.getTrace(e));
log.error(“[publishBatch]方法异常”);
}
}
具体问题
后台报错,报错日志如下
2025-05-21 17:10:12,447 [async_thread_3] ERROR c.i.i.o.s.mqtt.MqttPublishPlugin - [publishBatch]方法异常
2025-05-21 17:10:12,448 [async_thread_3] ERROR c.i.i.o.s.mqtt.MqttPublishPlugin - Too many publishes in progress (32202)
at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:527)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:163)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:193)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1375)
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1348)
at com.iflytek.iot.ota.service.mqtt.MqttPublishPlugin.lambda$publishBatch$0(MqttPublishPlugin.java:98)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.iflytek.iot.ota.service.mqtt.MqttPublishPlugin.publishBatch(MqttPublishPlugin.java:92)
at com.iflytek.iot.ota.service.mqtt.MqttPublishPlugin$$FastClassBySpringCGLIB$$d072b58a.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
emqx是不是可以调整什么参数解决这个发送数据问题