积压的消息,没有消费到,主题消息监控显示被消费

开始先把消费者服务停掉,使用JMeter推送了100条数据,消息队列也显示积压了100条,启动消费者后并没有收到队列中的消息,主题监控显示消息被接收了。


检查一下 clean start 为 false。

java代码中直接写死的false

开emqx的debug日志看看

debug日志显示被接收了,代码中没接收到,感觉像是被spring-integration-mqtt依赖包丢弃了,使用V3的通信协议,就可以消费到积压的消息,使用V5通信协议的就消费不到。

找到原因了,使用spring-integration-mqtt依赖包,
错误写法:


使用了ClientManager<IMqttAsyncClient, MqttConnectionOptions>管理器,在声明对象的时候需要start启动,启动时无法添加defaultMessageHandler消息处理,因为Mqttv5PahoMessageDrivenChannelAdapter还没有声明,所以消息找不到处理的地方,当Mqttv5PahoMessageDrivenChannelAdapter声明时会将当前处理器添加到ClientManager中,所以项目启动后,可以接收到新的消息,这里依赖包中也没有添加日志。

正确写法:
舍弃ClientManager<IMqttAsyncClient, MqttConnectionOptions>


Mqttv5PahoMessageDrivenChannelAdapter在初始化的时候会判断如果使用的不是ClientManager,会声明MqttAsyncClient,并将当前对象设置为回调消息处理对象,在doStart方法中才进行连接,所以不会丢失消息。

1 个赞

优秀