客户端拥有订阅和发布两个client,订阅端接收消息并后发布端发送消息,订阅端会自动断开,订阅端和发布端是不同的clientID

环境

  • EMQX 版本:4.4.17
  • 操作系统版本:Windows10

重现此问题的步骤

/sys/login/register
ffff
2023-06-07 08:54:18.574 INFO 9856 — [_serve_subsribe] c.g.m.d.service.PushCallbackService : ---------------------连接断开,可以做重连
2023-06-07 08:54:48.621 INFO 9856 — [_serve_subsribe] c.g.m.d.service.impl.MQTTServiceImpl : 已断开连接

org.eclipse.paho.client.mqttv3.MqttException: 已断开连接
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: null
at java.io.DataInputStream.readByte(DataInputStream.java:267)
at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137)
… 1 common frames omitted

/sys/login/register
ffff
2023-06-07 08:54:50.949 INFO 9856 — [_serve_subsribe] c.g.m.d.service.PushCallbackService : ---------------------连接断开,可以做重连
2023-06-07 08:54:50.949 INFO 9856 — [_serve_subsribe] c.g.m.d.service.impl.MQTTServiceImpl : 客户机正在断开连接

org.eclipse.paho.client.mqttv3.MqttException: 客户机正在断开连接
at org.eclipse.paho.client.mqttv3.MqttAsyncClient.connect(MqttAsyncClient.java:729)
at org.eclipse.paho.client.mqttv3.MqttClient.connect(MqttClient.java:331)
at com.goodcom.mdm.device.service.impl.MQTTServiceImpl.subsribeConnect(MQTTServiceImpl.java:132)
at com.goodcom.mdm.device.service.impl.PushCallbackServiceImpl.connectionLost(PushCallbackServiceImpl.java:46)
at org.eclipse.paho.client.mqttv3.internal.CommsCallback.connectionLost(CommsCallback.java:304)
at org.eclipse.paho.client.mqttv3.internal.ClientComms.shutdownConnection(ClientComms.java:441)
at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197)
at java.lang.Thread.run(Thread.java:748)

/sys/login/register
ffff
2023-06-07 08:54:53.265 INFO 9856 — [_serve_subsribe] c.g.m.d.service.PushCallbackService : ---------------------连接断开,可以做重连
2023-06-07 08:55:23.279 INFO 9856 — [_serve_subsribe] c.g.m.d.service.impl.MQTTServiceImpl : 已断开连接

预期行为

实际行为

能提供下 EMQX 端的 debug 日志么,客户端的日志帮助不大

2023-06-07T10:07:58.471000+08:00 [debug] 127.0.0.1:62088 [Channel] RECV CONNECT(Q0, R0, D0, ClientId=mqtt_serve_subsribe, ProtoName=MQTT, ProtoVsn=4, CleanStart=false, KeepAlive=10, Username=admin, Password=)
2023-06-07T10:07:58.471000+08:00 [debug] 127.0.0.1:61310 clientid: <<“mqtt_serve_subsribe”>>
msg: emqx_channel_takeover_begin
2023-06-07T10:07:58.471000+08:00 [info] 127.0.0.1:62088 mqtt_serve_subsribe SUBSCRIBE /sys/login/+: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{}}
2023-06-07T10:07:58.471000+08:00 [info] 127.0.0.1:62088 mqtt_serve_subsribe SUBSCRIBE /sys/status/+/+/control: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{}}
2023-06-07T10:07:58.472000+08:00 [info] 127.0.0.1:61310 mqtt_serve_subsribe UNSUBSCRIBE /sys/login/+: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{},subid => <<“mqtt_serve_subsribe”>>}
2023-06-07T10:07:58.472000+08:00 [info] 127.0.0.1:61310 mqtt_serve_subsribe UNSUBSCRIBE /sys/status/+/+/control: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{},subid => <<“mqtt_serve_subsribe”>>}
2023-06-07T10:07:58.472000+08:00 [debug] 127.0.0.1:61310 clientid: <<“mqtt_serve_subsribe”>>
msg: emqx_channel_takeover_end
2023-06-07T10:07:58.472000+08:00 [info] 127.0.0.1:61310 msg: terminate
reason: {shutdown,takeovered}
2023-06-07T10:07:58.472000+08:00 [debug] 127.0.0.1:62088 client_id: <<“mqtt_serve_subsribe”>>
msg: insert_channel_info
2023-06-07T10:07:58.472000+08:00 [debug] 127.0.0.1:62088 [MQTT] SEND CONNACK(Q0, R0, D0, AckFlags=1, ReasonCode=0)
2023-06-07T10:07:58.472000+08:00 [debug] 127.0.0.1:62088 [MQTT] SEND PUBREL(Q1, R0, D0, PacketId=1, ReasonCode=0)
2023-06-07T10:07:58.480000+08:00 [debug] 127.0.0.1:62088 [MQTT] RECV <<112,2,0,1>>
2023-06-07T10:07:58.480000+08:00 [debug] 127.0.0.1:62088 [MQTT] RECV PUBCOMP(Q0, R0, D0, PacketId=1, ReasonCode=0)
2023-06-07T10:07:58.481000+08:00 [debug] 127.0.0.1:62088 [MQTT] RECV <<130,17,0,1,0,12,47,115,121,115,47,108,111,103,105,110,47,43,2>>
2023-06-07T10:07:58.481000+08:00 [debug] 127.0.0.1:62088 [MQTT] RECV SUBSCRIBE(Q1, R0, D0, PacketId=1, TopicFilters=[{<<“/sys/login/+”>>,#{nl => 0,qos => 2,rap => 0,rh => 0}}])
2023-06-07T10:07:58.481000+08:00 [info] 127.0.0.1:62088 mqtt_serve_subsribe SUBSCRIBE /sys/login/+: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{}}
2023-06-07T10:07:58.481000+08:00 [debug] 127.0.0.1:62088 [MQTT] SEND SUBACK(Q0, R0, D0, PacketId=1, ReasonCodes=[2])
2023-06-07T10:07:58.486000+08:00 [debug] 127.0.0.1:62088 [MQTT] RECV <<224,0>>
2023-06-07T10:07:58.486000+08:00 [debug] 127.0.0.1:62088 [MQTT] RECV DISCONNECT(Q0, R0, D0, ReasonCode=0)
2023-06-07T10:07:58.486000+08:00 [debug] 127.0.0.1:62088 [MQTT] Force to close the socket due to normal
2023-06-07T10:07:58.820000+08:00 [debug] 127.0.0.1:62089 [Channel] RECV CONNECT(Q0, R0, D0, ClientId=mqtt_serve_subsribe, ProtoName=MQTT, ProtoVsn=4, CleanStart=false, KeepAlive=10, Username=admin, Password=
)
2023-06-07T10:07:58.821000+08:00 [debug] 127.0.0.1:62088 clientid: <<“mqtt_serve_subsribe”>>
msg: emqx_channel_takeover_begin
2023-06-07T10:07:58.821000+08:00 [info] 127.0.0.1:62089 mqtt_serve_subsribe SUBSCRIBE /sys/login/+: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{}}
2023-06-07T10:07:58.822000+08:00 [info] 127.0.0.1:62089 mqtt_serve_subsribe SUBSCRIBE /sys/status/+/+/control: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{}}
2023-06-07T10:07:58.822000+08:00 [info] 127.0.0.1:62088 mqtt_serve_subsribe UNSUBSCRIBE /sys/login/+: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{},subid => <<“mqtt_serve_subsribe”>>}
2023-06-07T10:07:58.822000+08:00 [info] 127.0.0.1:62088 mqtt_serve_subsribe UNSUBSCRIBE /sys/status/+/+/control: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{},subid => <<“mqtt_serve_subsribe”>>}
2023-06-07T10:07:58.823000+08:00 [debug] 127.0.0.1:62088 clientid: <<“mqtt_serve_subsribe”>>
msg: emqx_channel_takeover_end
2023-06-07T10:07:58.823000+08:00 [info] 127.0.0.1:62088 msg: terminate
reason: {shutdown,takeovered}
2023-06-07T10:07:58.824000+08:00 [debug] 127.0.0.1:62089 client_id: <<“mqtt_serve_subsribe”>>
msg: insert_channel_info
2023-06-07T10:07:58.824000+08:00 [debug] 127.0.0.1:62089 [MQTT] SEND CONNACK(Q0, R0, D0, AckFlags=1, ReasonCode=0)
2023-06-07T10:07:58.824000+08:00 [debug] 127.0.0.1:62089 [MQTT] RECV <<130,28,0,2,0,23,47,115,121,115,47,115,116,97,116,117,115,47,43,47,43,47,99,111,110,116,114,111,108,2>>
2023-06-07T10:07:58.824000+08:00 [debug] 127.0.0.1:62089 [MQTT] RECV SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicFilters=[{<<“/sys/status/+/+/control”>>,#{nl => 0,qos => 2,rap => 0,rh => 0}}])
2023-06-07T10:07:58.825000+08:00 [info] 127.0.0.1:62089 mqtt_serve_subsribe SUBSCRIBE /sys/status/+/+/control: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,sub_props => #{}}
2023-06-07T10:07:58.825000+08:00 [debug] 127.0.0.1:62089 [MQTT] SEND SUBACK(Q0, R0, D0, PacketId=2, ReasonCodes=[2])
2023-06-07T10:08:04.156000+08:00 [debug] 127.0.0.1:62089 [MQTT] SEND PUBLISH(Q2, R0, D0, Topic=/sys/login/register, PacketId=2, Payload=<<“{"msg":"register","content":[{"imei":"868133040305062","aid":"3e8febe6bb7af125","dt":"20230606075005","key":"f866b39a0bca9ad01dcef5647f64f332"}]}”>>)
2023-06-07T10:08:04.161000+08:00 [debug] 127.0.0.1:62089 [MQTT] RECV <<80,2,0,2>>
2023-06-07T10:08:04.161000+08:00 [debug] 127.0.0.1:62089 [MQTT] RECV PUBREC(Q0, R0, D0, PacketId=2, ReasonCode=0)
2023-06-07T10:08:04.161000+08:00 [debug] 127.0.0.1:62089 [MQTT] SEND PUBREL(Q1, R0, D0, PacketId=2, ReasonCode=0)

这是我发布端的

2023-06-07T10:09:54.732000+08:00 [debug] 127.0.0.1:62097 [MQTT] RECV <<192,0>>
2023-06-07T10:09:54.732000+08:00 [debug] 127.0.0.1:62097 [MQTT] RECV PINGREQ(Q0, R0, D0)
2023-06-07T10:09:54.733000+08:00 [debug] 127.0.0.1:62097 [MQTT] SEND PINGRESP(Q0, R0, D0)
2023-06-07T10:10:03.709000+08:00 [debug] 127.0.0.1:62228 [Channel] RECV CONNECT(Q0, R0, D0, ClientId=mqtt_serve_publish, ProtoName=MQTT, ProtoVsn=4, CleanStart=false, KeepAlive=10, Username=admin, Password=******)
2023-06-07T10:10:03.710000+08:00 [debug] 127.0.0.1:62097     clientid: <<"mqtt_serve_publish">>
    msg: emqx_channel_takeover_begin
2023-06-07T10:10:03.710000+08:00 [debug] 127.0.0.1:62097     clientid: <<"mqtt_serve_publish">>
    msg: emqx_channel_takeover_end
2023-06-07T10:10:03.710000+08:00 [info] 127.0.0.1:62097     msg: terminate
    reason: {shutdown,takeovered}
2023-06-07T10:10:03.711000+08:00 [debug] 127.0.0.1:62228     client_id: <<"mqtt_serve_publish">>
    msg: insert_channel_info
2023-06-07T10:10:03.712000+08:00 [debug] 127.0.0.1:62228 [MQTT] SEND CONNACK(Q0, R0, D0, AckFlags=1, ReasonCode=0)
2023-06-07T10:10:03.717000+08:00 [debug] 127.0.0.1:62228 [MQTT] RECV <<52,165,1,0,29,47,115,121,115,47,56,54,56,49,51,51,48,52,48,51,48,53,48,54,50,47,114,101,103,105,115,116,101,114,0,1,123,34,115,116,97,116,117,115,34,58,34,50,48,48,34,44,32,34,109,115,103,34,58,34,115,117,99,99,101,115,115,34,44,32,34,99,111,110,116,101,110,116,34,58,91,123,34,100,101,118,105,100,34,58,49,44,34,114,101,115,112,111,110,115,101,34,58,34,114,101,103,105,115,116,101,114,34,44,34,116,111,107,101,110,99,34,58,34,102,54,101,101,55,97,49,101,54,51,49,52,52,52,51,55,56,102,102,56,49,100,101,57,98,97,98,102,52,48,99,56,34,44,34,114,105,100,34,58,49,125,93,125>>
2023-06-07T10:10:03.717000+08:00 [debug] 127.0.0.1:62228 [MQTT] RECV PUBLISH(Q2, R0, D0, Topic=/sys/868133040305062/register, PacketId=1, Payload=<<"{\"status\":\"200\", \"msg\":\"success\", \"content\":[{\"devid\":1,\"response\":\"register\",\"tokenc\":\"f6ee7a1e631444378ff81de9babf40c8\",\"rid\":1}]}">>)
2023-06-07T10:10:03.718000+08:00 [info] 127.0.0.1:62228 PUBLISH to /sys/868133040305062/register: <<"{\"status\":\"200\", \"msg\":\"success\", \"content\":[{\"devid\":1,\"response\":\"register\",\"tokenc\":\"f6ee7a1e631444378ff81de9babf40c8\",\"rid\":1}]}">>
2023-06-07T10:10:03.718000+08:00 [debug] 127.0.0.1:62228 [MQTT] SEND PUBREC(Q0, R0, D0, PacketId=1, ReasonCode=0)
2023-06-07T10:10:03.719000+08:00 [debug] 127.0.0.1:62228 [MQTT] RECV <<98,2,0,1>>
2023-06-07T10:10:03.719000+08:00 [debug] 127.0.0.1:62228 [MQTT] RECV PUBREL(Q1, R0, D0, PacketId=1, ReasonCode=0)
2023-06-07T10:10:03.719000+08:00 [debug] 127.0.0.1:62228 [MQTT] SEND PUBCOMP(Q0, R0, D0, PacketId=1, ReasonCode=0)
2023-06-07T10:10:13.725000+08:00 [debug] 127.0.0.1:62228 [MQTT] RECV <<192,0>>
2023-06-07T10:10:13.726000+08:00 [debug] 127.0.0.1:62228 [MQTT] RECV PINGREQ(Q0, R0, D0)
2023-06-07T10:10:13.726000+08:00 [debug] 127.0.0.1:62228 [MQTT] SEND PINGRESP(Q0, R0, D0)

发布端能发送消息成功,但回调的时候返回为null

MqttException (0) - java.lang.NumberFormatException: null
2023-06-07 10:10:03.730  INFO 7604 --- [_serve_subsribe] c.g.m.d.service.PushCallbackService      : ---------------------连接断开,可以做重连
2023-06-07 10:10:33.764  INFO 7604 --- [_serve_subsribe] c.g.m.d.service.impl.MQTTServiceImpl     : 已断开连接

从 EMQX 日志来看,你一直在使用同一个 clientid mqtt_serve_subsribe 在进行登录,建议检查下你的客户端代码。

clientid `mqtt_serve_subsribe是我的订阅端,是因为断开之后重连导致的,我想应该关注在:发布端发布消息订阅端会自动断开,发布的消息能够成功接收,但回调的时候是null,是因为传输过程的某个关键点有问题吗