错误报告
java后台报错 :
2023-01-03 23:29:36.279 ERROR 1 — [c592ed13ac31281] o.e.p.c.mqttv3.internal.ClientState : iot-983be75ebee366caac592ed13ac31281: Timed out as no activity, keepAlive=100,000 lastOutboundActivity=1,672,759,676,278 lastInboundActivity=1,672,759,578,377 time=1,672,759,776,278 lastPing=1,672,759,676,278
eqmx报错日志:
环境
重现此问题的步骤
- xxx
- xxx
- xxx
预期行为
实际行为
功能请求
描述你需要的功能
为什么你需要这个功能
其他
t1ger
2
首先 Broker 是不会发送心跳的,只会响应客户端的心跳。客户端可能会在超时未收到心跳响应的情况下主动断开连接。
EMQX 的这部分日志表示当前出现了消息堆积,消息队列满,为了保证新消息继续入队,所以丢弃了队列中旧的消息。出现这个情况一般都是订阅端消费能力不足。
心跳响应回不过去可能是因为客户端处理其它消息满,所以网络通道堵住了,但是正常来说,如果有接收到其它消息,就不应该判定为心跳超时,这个需要看你客户端 SDK 的具体实现了。
我该如何提高订阅端消息的消费能力呢,目前业务代码就是在消息回调中使用线程池去处理消息,这个情况也是偶发的,可能是某段时间消息量突然很大没处理过来导致的。
t1ger
5
常见的做法就是像 messageArrived 这类回调不要写成同步形式的,消息接收和业务处理异步进行。
在messageArrived回调结束之和,就是像broker发送qos1消息的接收确认不,我这里在消息回调里面,使用线程池excute任务,不算异步进行嘛。还有别的方式可以提高消息消费速度不,总是偶尔的出现这个问题,而且还无法重连,人都要麻了
t1ger
7
如果消息吞吐变高,线程池没有可用线程这里是不是就会阻塞住了?你可以做下简单的性能测试看看代码上哪里还可以优化的。
另外使用共享订阅和提升消费端硬件水平都是比较有效的办法。
我刚刚测试了一下每条1500条的消息吞吐,我每个消息设置了400ms的处理时间就报错了,消息接收和处理都是异步的,实际项目中确实可能出现这么高的并发,引入一个消息队列,消息处理就是将其放入队列中,随后使用线程池从队列中拿取消息进行处理这样会不会好点?
t1ger
10
如果你的高吞吐是短暂的,这么处理是没问题的。
如果常规情况下一直都是这么高的吞吐,你就需要优化客户端消费逻辑了,队列不能解决问题。
1 个赞
我们把硬件消息质量降低为了qos0,测试了一般消息吞吐量是qos1的4倍,因为比较频繁的消息都不是非丢不可得所以这样改了,重要的消息还是qos1,然后服务端消息都是异步并发处理的,这样一个消费端基本满足了公司的要求,如果还要提高并发量就只能用共享订阅部署多个服务了。
你们原本qos设置的是多少。因为我们的消息是必须到达,且不可丢失的,所以设置的qos2。我们现在使用的就是共享订阅,服务端使用了两个服务进行处理,但是服务里面使用的是单线程处理的,及时这样还是会出现这样的问题。主要是我们目前的设备才11几台。
我这边计算过处理一条消息需要100多ms,这里包含了接收消息和处理消息。我现在接收消息和处理消息是在一个线程里面
t1ger
18
最好优化一下代码逻辑吧,消息不大的话应该这 100 ms 大头还是在处理消息上,服务器单单处理一个消息就需要 100 ms 有点夸张了。
对。时间应该都是花在处理消息业务逻辑上,接收消息本身应该花不了多少时间的。所以计划就是把接收消息和处理消息分开来。
还有一个问题我想请教的就是,emqx是不是会等到客户端那边消费完消息之后,客户端回复emqx了,才会将下一条消息提供给客户端继续消费?