关于 Paho Mqtt Java 偶尔出现 checkForActivity Timed out as no activity 情况的排查

我在官方 issue 提交了一个关于"死锁" 导致 checkForActivity 不通过的 bug

下面简单描述下这个 bug:

复现

  1. 启动一个 sub client(clean session false),并且在 connectComplete 进行同步订阅 topic,一个 pub client
  2. pub client 一直发送消息 到 topic,sub client 正常接收消息
  3. sub client 主动关掉,pub client 继续发送超过10个以上的消息
  4. pub client 可以先停掉了,此时 sub client session 里面有超过 10个未被 publish 的消息
  5. sub client 重新连接,即可重现
 11, 2023 5:22:46  org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
: client1: Timed out as no activity, keepAlive=15,000,000,000 lastOutboundActivity=202,748,527,866,900 lastInboundActivity=202,733,513,164,800 time=202,763,529,941,000 lastPing=202,748,527,900,000
on connectionLost

分析

线程模型

  • CommsReceiver(接收消息线程)
  • CommsCallback(回调线程,connectCompletemessageArrived 都是它回调)
  • CommsSender(发布消息线程)

原因

    1. sub client 重连后,会从 broker 接收到 session 保留的消息(此时肯定比 subscribe topic 的 ack 要快,因为连接成功,broker 就会发下来了)
    1. sub client 接收后会放在 CommsCallback.messageQueue , **CommsCallback.messageQueue**是由 CommsCallback 线程消费回调的
    1. sub client 在接收到 connect ack 后,即连接成功,CommsCallback 回调 connectComplete 方法,connectComplete 里面进行同步订阅,然后 CommsCallback 线程会被 wait() 等待,直到 subscribe ack 唤醒
  • 4.此时 CommsReceiver 线程在接收到 10 个消息放在 CommsCallback.messageQueue 后,会 一直自旋 wait() 等待 CommsCallback.messageQueue size < 10

    1. CommsReceiver 线程就这样一直处于 wait 状态,不可能会接收 ping ack 和 subscribe ack。
    1. CommsCallback 线程又处于 subscribe 等待中,也是处于 wait 状态,不可能从 CommsCallback.messageQueue 消费消息。
    1. 最后心跳检测( PingTask)发现检测不通过,关闭连接

解决

  • 如果是要有需求在 connectComplete 进行订阅操作,可以使用异步操作

以上是我遇到 checkForActivity Timed out as no activity 问题的场景。
EMQX 好像也有不少这个问题的帖子,可以参考下我这个 case,希望对你们有帮助

1 个赞

感谢分享!

你好,我也遇到了相同的问题,在解决方法里 在 connectComplete 进行订阅操作,可以使用异步操作,有实例代码吗?感谢分享下

再次请问,和gitHub上这个issues是相同问题吗?

1.只要异步进行,不堵塞住 connectComplete,能正常处理 PingResponse 即可,目前我采取了线程池进行异步重新订阅。

2.这个 issues 不确定是不是跟我同一个场景,如果你想认证你的情况跟我的是不是一样,可以参考描述的复现步骤去验证下。

我的情况有些不一样,我的发送端发生了这个情况Timed out as no activity, keepAlive。这个也不存在消费的问题

排查下网络问题看看,结合双端的日志