环境信息
- EMQX 版本:5.0.8
- 操作系统及版本:Windows11
- 其他
问题描述
emqx5.0.8社区版本,SpringBoot集成mqtt连接emqx,生产者一次发送1w条消息,消费者只能消费1000多条数据
emqx5.0.8社区版本,SpringBoot集成mqtt连接emqx,生产者一次发送1w条消息,消费者只能消费1000多条数据
1w消息发送太快,修改为每发一条暂停10毫秒后,消息都能发完和消费完
有可能在发布端就丢了的,可以对比 emqx 的为统计看看。
或者在多检查下客户端日志和 emqx 日志
发送这边试着这样写看看,带同步锁
public void publish(int qos, boolean retained, String topic, String payload) {
MqttMessage message = new MqttMessage();
message.setPayload(payload.getBytes());
message.setQos(qos);
message.setRetained(retained);
MqttTopic mqttTopic = client.getTopic(topic);
if (null == mqttTopic) {
Log.out().error("topic:" + topic + " 不存在");
}
MqttDeliveryToken token;//Delivery:配送
synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
try {
Log.out().info("topic:" + topic + "发送");
token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
token.waitForCompletion(500L);
if (!token.isComplete()) {
Log.out().info("消息发送成功");
}
}catch (MqttPersistenceException e) {
e.printStackTrace();
Log.out().error("publish异常:" + e.getMessage());
} catch (MqttException e) {
Log.out().error("publish异常:" + e.getMessage());
}
}
}