MQTT发送消息至EMQX桥接clickhuose 数据丢失未落表

我在网上看到 可用于高并发场景的MQTT平台 有EMQX
但是在测试过程中 500并发的消息处理不掉 有数据丢失
具体操作是本地代码每1秒发送500条消息到emqx emqx配置桥接clickhuose的数据处理 消息接收和发出测试正常 落数据库没有完全落

有数据丢失还是有发送错误?EMQX Dashboard 上数据桥接的部分有一些统计和计数,你看看是 dropp 还是什么,还需要看一下日志文件里是否有错误日志。


@RequestMapping(value = “/publishTopic”)
public String publishTopic(String topic, int sendMessage) {
long start = System.currentTimeMillis();
log.info("task start time " + start);
long end = System.currentTimeMillis(); // 结束时间为当前时间 + 20 秒
while (secondsCount < sendMessage) {
try {
int numTasks = 500; // 并发任务数量
ExecutorService executor = Executors.newFixedThreadPool(numTasks); // 创建固定大小的线程池
for (int i = 0; i < numTasks; i++) {
final int taskId = i;
Runnable task = new Runnable() {
@Override
public void run() {
// 在这里编写并发执行的代码逻辑
String s = TemperatureSensor.readTemperature();
try {
mqttConfig.publish(false,topic,s);
log.info("Task " + taskId + “: Executing”);
} catch (Exception e) {
log.info("Task " + taskId + “: error”);
e.printStackTrace();
}
}
};
executor.execute(task); // 提交任务给线程池执行
}
executor.shutdown(); // 关闭线程池
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(1000);
secondsCount++;
} catch (InterruptedException e) {
e.printStackTrace();
}
end = System.currentTimeMillis();
log.info(“Task " + (end - start) + " is delivered.”);
}
secondsCount=0;
System.out.println(“topic:” + topic);
System.out.println(“message:” + sendMessage);
return “topic:” + topic + “\nmessage:” + sendMessage;
}

持续60秒的结果如上图所示 测试500并发 是不是因为
image
这个原因 连接数100??

默认的 license 的确只能允许很少的 MQTT 连接,你如果连接太多日志里会有一个超限制的错误。

然后我看到你的日志里,有很多 takenover 的掉线日志,是因为你使用同样的 clientid 尝试重连,后面的连接会把前面的踢掉(接管老连接的 session)。clientid 是 MQTT 连接的唯一标识,不能重复。

哦 明白了 那我这边MQTT需要怎么处理 才能不一直断线重连 可能是需要配置500个客户端id,模拟500个传感器 每秒或者每多少毫秒发送 这样?

是的。但你的 license 有限制,500 不行。需要减少客户端的数量,最多不能超过100 个。


已经申请试用license 应该可以

谢谢您的技术支持




500并发持续1小时 还在执行中 部分数据失败 emqx报错日志如图


测试时有这些报错是什么原因

看起来是 EMQQX 和 clickhouse 的连接断开了: reason: {error, closed}

日志时间: 02:21:19.566584 这里的资源健康检查也超时了。

这个要怎么解决呢 模拟测试是 500并发持续120秒 发送到EMQX 落clickhouse的buffer
现实场景应该是一直发消息落表的