emqx5.0.8社区版本,消息丢失

环境信息

  • EMQX 版本:5.0.8
  • 操作系统及版本:Windows11
  • 其他

问题描述

emqx5.0.8社区版本,SpringBoot集成mqtt连接emqx,生产者一次发送1w条消息,消费者只能消费1000多条数据

配置文件及日志

1w消息发送太快,修改为每发一条暂停10毫秒后,消息都能发完和消费完

有可能在发布端就丢了的,可以对比 emqx 的为统计看看。

或者在多检查下客户端日志和 emqx 日志

发送这边试着这样写看看,带同步锁

public void publish(int qos, boolean retained, String topic, String payload) {
        MqttMessage message = new MqttMessage();
        message.setPayload(payload.getBytes());
        message.setQos(qos);
        message.setRetained(retained);
        MqttTopic mqttTopic = client.getTopic(topic);
        if (null == mqttTopic) {
            Log.out().error("topic:" + topic + " 不存在");
        }
        MqttDeliveryToken token;//Delivery:配送
        synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁,分析见文章最后补充
            try {
                Log.out().info("topic:" + topic + "发送");
                token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
                token.waitForCompletion(500L);
                if (!token.isComplete()) {
                    Log.out().info("消息发送成功");
                }
            }catch (MqttPersistenceException e) {
                e.printStackTrace();
                Log.out().error("publish异常:" + e.getMessage());
            } catch (MqttException e) {
                Log.out().error("publish异常:" + e.getMessage());
            }
        }
    }