客户端收不到broker心跳包断开连接且无法重连

错误报告

java后台报错 :
2023-01-03 23:29:36.279 ERROR 1 — [c592ed13ac31281] o.e.p.c.mqttv3.internal.ClientState : iot-983be75ebee366caac592ed13ac31281: Timed out as no activity, keepAlive=100,000 lastOutboundActivity=1,672,759,676,278 lastInboundActivity=1,672,759,578,377 time=1,672,759,776,278 lastPing=1,672,759,676,278

eqmx报错日志:

环境

  • EMQX 版本:
  • 操作系统版本:

重现此问题的步骤

  1. xxx
  2. xxx
  3. xxx

预期行为

实际行为


功能请求

描述你需要的功能

为什么你需要这个功能


其他

首先 Broker 是不会发送心跳的,只会响应客户端的心跳。客户端可能会在超时未收到心跳响应的情况下主动断开连接。

EMQX 的这部分日志表示当前出现了消息堆积,消息队列满,为了保证新消息继续入队,所以丢弃了队列中旧的消息。出现这个情况一般都是订阅端消费能力不足。

心跳响应回不过去可能是因为客户端处理其它消息满,所以网络通道堵住了,但是正常来说,如果有接收到其它消息,就不应该判定为心跳超时,这个需要看你客户端 SDK 的具体实现了。

我该如何提高订阅端消息的消费能力呢,目前业务代码就是在消息回调中使用线程池去处理消息,这个情况也是偶发的,可能是某段时间消息量突然很大没处理过来导致的。


这是我追踪的消费端的日志,有问题不?

常见的做法就是像 messageArrived 这类回调不要写成同步形式的,消息接收和业务处理异步进行。

在messageArrived回调结束之和,就是像broker发送qos1消息的接收确认不,我这里在消息回调里面,使用线程池excute任务,不算异步进行嘛。还有别的方式可以提高消息消费速度不,总是偶尔的出现这个问题,而且还无法重连,人都要麻了

如果消息吞吐变高,线程池没有可用线程这里是不是就会阻塞住了?你可以做下简单的性能测试看看代码上哪里还可以优化的。

另外使用共享订阅和提升消费端硬件水平都是比较有效的办法。

我刚刚测试了一下每条1500条的消息吞吐,我每个消息设置了400ms的处理时间就报错了,消息接收和处理都是异步的,实际项目中确实可能出现这么高的并发,引入一个消息队列,消息处理就是将其放入队列中,随后使用线程池从队列中拿取消息进行处理这样会不会好点?

每秒钟1500条 说错啦

如果你的高吞吐是短暂的,这么处理是没问题的。

如果常规情况下一直都是这么高的吞吐,你就需要优化客户端消费逻辑了,队列不能解决问题。

1 个赞

使用共享订阅的话 我是不是要部署多个后台服务

是的

你好。你目前这个问题解决了吗

我们把硬件消息质量降低为了qos0,测试了一般消息吞吐量是qos1的4倍,因为比较频繁的消息都不是非丢不可得所以这样改了,重要的消息还是qos1,然后服务端消息都是异步并发处理的,这样一个消费端基本满足了公司的要求,如果还要提高并发量就只能用共享订阅部署多个服务了。

你们原本qos设置的是多少。因为我们的消息是必须到达,且不可丢失的,所以设置的qos2。我们现在使用的就是共享订阅,服务端使用了两个服务进行处理,但是服务里面使用的是单线程处理的,及时这样还是会出现这样的问题。主要是我们目前的设备才11几台。

你的服务器正常来讲每秒可以处理多少条消息?

我这边计算过处理一条消息需要100多ms,这里包含了接收消息和处理消息。我现在接收消息和处理消息是在一个线程里面

最好优化一下代码逻辑吧,消息不大的话应该这 100 ms 大头还是在处理消息上,服务器单单处理一个消息就需要 100 ms 有点夸张了。

对。时间应该都是花在处理消息业务逻辑上,接收消息本身应该花不了多少时间的。所以计划就是把接收消息和处理消息分开来。

还有一个问题我想请教的就是,emqx是不是会等到客户端那边消费完消息之后,客户端回复emqx了,才会将下一条消息提供给客户端继续消费?