我在官方 issue 提交了一个关于"死锁" 导致 checkForActivity 不通过的 bug
opened 01:37AM - 12 May 23 UTC
- [x] Bug exists Release Version 1.2.5 ( Master Branch)
- [x] Bug exists in MQT… Tv3 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
- [ ] Bug exists in MQTTv5 Client on Snapshot Version 1.2.6-SNAPSHOT (Develop Branch)
## Reproduction steps
1. create a sub client(subscribing to a topic in the connectComplete callback)
`public void connectComplete(boolean reconnect, String serverURI) {
myClient.subscribe(this.topic);
}
`
and create a pub client
3. the pub client continuously publishes messages.
4. the sub client receive messages
5. then, close the sub client whille the pub client continues to publish message, ensuring that the pub client still published more than 10 message
6. now, the sub client session(in broker) has more than 10 unconsumed messages
7. close the pub client, reconnect the sub client(sub client id must unchanged)
8. finally, the sub client will be blocked until the `checkForActivity` method closes the client
```
���� 11, 2023 5:22:46 ���� org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
����: client1: Timed out as no activity, keepAlive=15,000,000,000 lastOutboundActivity=202,748,527,866,900 lastInboundActivity=202,733,513,164,800 time=202,763,529,941,000 lastPing=202,748,527,900,000
on connectionLost
```
## More Information
1. the `checkForActivity` Method closes the client because that client was not received MqttPingResp(ping response)
2. additionally, I've discovered that subscribe ack(MqttSuback) was not recevied either
3. however, ping ack and sub ack both had been sent by the broker(EMQX Broker)
![image](https://github.com/eclipse/paho.mqtt.java/assets/39401478/eb27b83f-5f02-4242-94f6-54f20b9ea0e8)
![image](https://github.com/eclipse/paho.mqtt.java/assets/39401478/57c07cc1-27b0-43e7-8ebb-bd7fdb6ccabd)
4. the sub client can recevied 10 message from broker when reconnecting(debug can found it, but did not send message ack)
## Based on further speculation
- the sub client hadn't received ping ack and sub ack
- the status of the Rec Thread(CommsReceiver) may be in WAITING state
## Reason analysis
### Paho Mqtt Thread
- CommsReceiver(receive message from broker)
- CommsCallback(consume message)
- CommsSender(publish meesage)
### Client startup process (coarse-grained):
1. connect packet sent
2. connect ack recevied
3. received message from broker if need
4. the `connectComplete` called
5. subscribe a topic in the `connectComplete`, and then the CallThread(CommsCallback) will wait until it gets sub ack notified
### Rec Thread(CommsReceiver) Part
1. connect ack received
2. received message from broker(more than 10 messages)
3. put the messages in the `CommsCallback.messageQueue`
![image](https://github.com/eclipse/paho.mqtt.java/assets/39401478/f073f2da-24ec-4fb3-9160-d9f7a319a867)
5. The important logic is coming:`clientState.notifyReceivedMsg(message)`
**Rec Thread(CommsReceiver) will wait when ` messageQueue.size()` >= 10**
![image](https://github.com/eclipse/paho.mqtt.java/assets/39401478/9b7f2a91-3e7e-4e6c-a71b-bea0e924eb4c)
## Conclusion
**1. The Call Thread(CommsCallback) state in WAITING when subscribe in the `connectComplete `
2. The Rec Thread(CommsReceiver) state in WAITING when `CommsCallback.messageQueue` size >= 10
3. The `CommsCallback.messageQueue` consumed in Call Thread(CommsCallback), but Call Thread(CommsCallback) state in WAITING now
4. The Rec Thread(CommsReceiver) state in WAITING, Thus Ping Ack and Sub Ack are never received.
5. Based on the above point, The Call Thread(CommsCallback) will not be notified
6. Finally, The `PingTask` will close the client**
下面简单描述下这个 bug:
复现
启动一个 sub client(clean session false),并且在 connectComplete
进行同步订阅 topic,一个 pub client
pub client 一直发送消息 到 topic,sub client 正常接收消息
sub client 主动关掉,pub client 继续发送超过10个以上的消息
pub client 可以先停掉了,此时 sub client session 里面有超过 10个未被 publish 的消息
sub client 重新连接,即可重现
11, 2023 5:22:46 org.eclipse.paho.client.mqttv3.internal.ClientState checkForActivity
: client1: Timed out as no activity, keepAlive=15,000,000,000 lastOutboundActivity=202,748,527,866,900 lastInboundActivity=202,733,513,164,800 time=202,763,529,941,000 lastPing=202,748,527,900,000
on connectionLost
分析
线程模型
CommsReceiver(接收消息线程)
CommsCallback(回调线程,connectComplete
和 messageArrived
都是它回调)
CommsSender(发布消息线程)
原因
解决
如果是要有需求在 connectComplete
进行订阅操作,可以使用异步操作
以上是我遇到 checkForActivity Timed out as no activity
问题的场景。
EMQX 好像也有不少这个问题的帖子,可以参考下我这个 case,希望对你们有帮助
1 个赞
你好,我也遇到了相同的问题,在解决方法里 在 connectComplete
进行订阅操作,可以使用异步操作,有实例代码吗?感谢分享下
再次请问,和gitHub上这个issues是相同问题吗?
opened 02:27AM - 23 Aug 18 UTC
More Information Required
how to fix this issues! when i use the paho.mqtt.3.1.2.0 java.jar
1.只要异步进行,不堵塞住 connectComplete,能正常处理 PingResponse 即可,目前我采取了线程池进行异步重新订阅。
2.这个 issues 不确定是不是跟我同一个场景,如果你想认证你的情况跟我的是不是一样,可以参考描述的复现步骤去验证下。
我的情况有些不一样,我的发送端发生了这个情况Timed out as no activity, keepAlive。这个也不存在消费的问题