共享订阅bug(严重)

具体请参考附件,有环境介绍,详细步骤,以及截图
使用的是EMQX 4.4.16开源版本
5a4edacf73cadb08a03ed667-337510421-190423-0632.zip (757.5 KB)

感谢反馈,我会根据描述尽快进行问题复现。

您好,根据您提供文档中的复现步骤未能复现
(使用 mosquitto 2.0.15 linux 版及 mqttx-cli linux版客户端)

“Topic” 列表中有多个相同主题的情况,只能在以下情况下触发:

  • 一个客户端订阅 $queue/test
  • 另一个客户端订阅 test

但截图中 mosquitto_sub 显示两个客户端都有订阅 $queue/test,目前不清楚是 mosquitto(windows版) 的问题还是 EMQX 的问题,您可以在 debug 日志等级下再次尝试进行复现,并且把日志贴上来进行进一步分析。

  1. “Topic” 列表中有多个相同主题的情况,只能在以下情况下触发:
  • 一个客户端订阅 $queue/test
  • 另一个客户端订阅 test
  1. 但截图中 mosquitto_sub 显示两个客户端都有订阅 $queue/test,目前不清楚是 mosquitto(windows版) 的问题还是 EMQX 的问题

第一点是不成立的,两个mosqustto_sub已经发送了$queue/test并且被EMQX接收到,所以截图中才会显示两个客户端都订阅了$queue/test。第一点只是理想情况,实际并不一定是这样。

你们做了多少次测试,我在100次左右的测试中,至少遇到过4次。

开启EMQX debug日志是个好思路,我回头测一下再跟进。

Topics 列表中有多个相同主题

指的是这一步
这个情况是必现的么,还是说只有这四次测试才会遇到?

我手动测试的次数不多,大概 50 次左右,后面会用脚本重新测一下。

我在本地复现了,这次打开debug日志,抓了一些数据,
bug.zip (1.4 MB)
。请参见附件和文档最后2023/4/24更新部分。

指的是这一步
这个情况是必现的么,还是说只有这四次测试才会遇到?

不是必现的,在平均25次测试中会出现一次,但是复现靠运气。

日志 172,173 行,看起来 client2 发来的 SUBSCRIBE 报文中,只订阅了 test 主题。

2023-04-24T02:06:14.244816+00:00 [debug] client2@172.17.0.1:51854 [MQTT] RECV <<130,9,0,1,0,4,116,101,115,116,2>>
2023-04-24T02:06:14.244923+00:00 [debug] client2@172.17.0.1:51854 [MQTT] RECV SUBSCRIBE(Q1, R0, D0, PacketId=1, TopicFilters=[{<<"test">>,#{nl => 0,qos => 2,rap => 0,rh => 0}}])

对比如下 client1 订阅的 $queue/test 主题

2023-04-24T01:41:05.916491+00:00 [debug] client1@172.17.0.1:51288 [MQTT] RECV <<130,16,0,1,0,11,36,113,117,101,117,101,47,116,101,115,116,2>>
2023-04-24T01:41:05.916686+00:00 [debug] client1@172.17.0.1:51288 [MQTT] RECV SUBSCRIBE(Q1, R0, D0, PacketId=1, TopicFilters=[{<<"$queue/test">>,#{nl => 0,qos => 2,rap => 0,rh => 0}}])

这行日志忽略,通过共享订阅$share/$queue/test,问题没复现出来。我是后来通过订阅$share/g1/test复现的。你可以只关注$share/g1/test这个topic订阅之后的日志

2023-04-24T06:10:08 client1连接并订阅

2023-04-24T06:10:08.716971+00:00 [debug] 172.17.0.1:58220 [MQTT] RECV <<16,19,0,4,77,81,84,84,4,0,0,60,0,7,99,108,105,101,110,116,49>>
2023-04-24T06:10:08.717084+00:00 [debug] 172.17.0.1:58220 [MQTT] RECV CONNECT(Q0, R0, D0, ClientId=client1, ProtoName=MQTT, ProtoVsn=4, CleanStart=false, KeepAlive=60, Username=undefined, Password=undefined)
2023-04-24T06:10:08.717163+00:00 [debug] client1@172.17.0.1:58220 [Channel] RECV CONNECT(Q0, R0, D0, ClientId=client1, ProtoName=MQTT, ProtoVsn=4, CleanStart=false, KeepAlive=60, Username=undefined, Password=undefined)
2023-04-24T06:10:08.717302+00:00 [debug] client1@172.17.0.1:58204 clientid: <<“client1”>>, file: emqx_channel.erl, line: 1005, mfa: {emqx_channel,handle_call,2}, msg: emqx_channel_takeover_begin, pid: <0.4165.0>
2023-04-24T06:10:08.717567+00:00 [info] client1@172.17.0.1:58220 client1 SUBSCRIBE test: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,share => <<“$queue”>>,sub_props => #{}}
2023-04-24T06:10:08.717676+00:00 [info] client1@172.17.0.1:58204 client1 UNSUBSCRIBE test: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,share => <<“$queue”>>,sub_props => #{},subid => <<“client1”>>}
2023-04-24T06:10:08.717801+00:00 [debug] client1@172.17.0.1:58204 clientid: <<“client1”>>, file: emqx_channel.erl, line: 1017, mfa: {emqx_channel,handle_call,2}, msg: emqx_channel_takeover_end, pid: <0.4165.0>
2023-04-24T06:10:08.717967+00:00 [info] client1@172.17.0.1:58204 file: emqx_connection.erl, line: 544, mfa: {emqx_connection,terminate,2}, msg: terminate, pid: <0.4165.0>, reason: {shutdown,takeovered}
2023-04-24T06:10:08.718102+00:00 [info] [Shared Sub] Shared subscriber down: <0.4165.0>
2023-04-24T06:10:08.718100+00:00 [debug] client1@172.17.0.1:58220 client_id: <<“client1”>>, file: emqx_cm.erl, line: 128, mfa: {emqx_cm,insert_channel_info,3}, msg: insert_channel_info, pid: <0.4201.0>
2023-04-24T06:10:08.718214+00:00 [debug] client1@172.17.0.1:58220 [MQTT] SEND CONNACK(Q0, R0, D0, AckFlags=1, ReasonCode=0)
2023-04-24T06:10:08.718167+00:00 [debug] client_id: <<“client1”>>, file: emqx_cm.erl, line: 562, mfa: {emqx_cm,clean_down,1}, msg: emqx_cm_clean_down, pid: <0.1772.0>
2023-04-24T06:10:08.720728+00:00 [debug] client1@172.17.0.1:58220 [MQTT] RECV <<130,19,0,1,0,14,36,115,104,97,114,101,47,103,49,47,116,101,115,116,2>>
2023-04-24T06:10:08.720867+00:00 [debug] client1@172.17.0.1:58220 [MQTT] RECV SUBSCRIBE(Q1, R0, D0, PacketId=1, TopicFilters=[{<<“$share/g1/test”>>,#{nl => 0,qos => 2,rap => 0,rh => 0}}])
2023-04-24T06:10:08.720995+00:00 [info] client1@172.17.0.1:58220 client1 SUBSCRIBE test: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,share => <<“g1”>>,sub_props => #{}}
2023-04-24T06:10:08.721119+00:00 [debug] client1@172.17.0.1:58220 [MQTT] SEND SUBACK(Q0, R0, D0, PacketId=1, ReasonCodes=[2])

2023-04-24T06:10:16 client2连接并订阅

2023-04-24T06:10:16.173348+00:00 [debug] 172.17.0.1:58224 [MQTT] RECV <<16,19,0,4,77,81,84,84,4,0,0,60,0,7,99,108,105,101,110,116,50>>
2023-04-24T06:10:16.173480+00:00 [debug] 172.17.0.1:58224 [MQTT] RECV CONNECT(Q0, R0, D0, ClientId=client2, ProtoName=MQTT, ProtoVsn=4, CleanStart=false, KeepAlive=60, Username=undefined, Password=undefined)
2023-04-24T06:10:16.173658+00:00 [debug] client2@172.17.0.1:58224 [Channel] RECV CONNECT(Q0, R0, D0, ClientId=client2, ProtoName=MQTT, ProtoVsn=4, CleanStart=false, KeepAlive=60, Username=undefined, Password=undefined)
2023-04-24T06:10:16.173888+00:00 [debug] client2@172.17.0.1:58208 clientid: <<“client2”>>, file: emqx_channel.erl, line: 1005, mfa: {emqx_channel,handle_call,2}, msg: emqx_channel_takeover_begin, pid: <0.4173.0>
2023-04-24T06:10:16.174127+00:00 [info] client2@172.17.0.1:58224 client2 SUBSCRIBE test: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,share => <<“$queue”>>,sub_props => #{}}
2023-04-24T06:10:16.174276+00:00 [info] client2@172.17.0.1:58208 client2 UNSUBSCRIBE test: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,share => <<“$queue”>>,sub_props => #{},subid => <<“client2”>>}
2023-04-24T06:10:16.174396+00:00 [debug] client2@172.17.0.1:58208 clientid: <<“client2”>>, file: emqx_channel.erl, line: 1017, mfa: {emqx_channel,handle_call,2}, msg: emqx_channel_takeover_end, pid: <0.4173.0>
2023-04-24T06:10:16.174548+00:00 [info] client2@172.17.0.1:58208 file: emqx_connection.erl, line: 544, mfa: {emqx_connection,terminate,2}, msg: terminate, pid: <0.4173.0>, reason: {shutdown,takeovered}
2023-04-24T06:10:16.174753+00:00 [info] [Shared Sub] Shared subscriber down: <0.4173.0>
2023-04-24T06:10:16.174693+00:00 [debug] client2@172.17.0.1:58224 client_id: <<“client2”>>, file: emqx_cm.erl, line: 128, mfa: {emqx_cm,insert_channel_info,3}, msg: insert_channel_info, pid: <0.4208.0>
2023-04-24T06:10:16.174837+00:00 [debug] client2@172.17.0.1:58224 [MQTT] SEND CONNACK(Q0, R0, D0, AckFlags=1, ReasonCode=0)
2023-04-24T06:10:16.174812+00:00 [debug] client_id: <<“client2”>>, file: emqx_cm.erl, line: 562, mfa: {emqx_cm,clean_down,1}, msg: emqx_cm_clean_down, pid: <0.1776.0>
2023-04-24T06:10:16.177286+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV <<130,19,0,1,0,14,36,115,104,97,114,101,47,103,49,47,116,101,115,116,2>>
2023-04-24T06:10:16.177369+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV SUBSCRIBE(Q1, R0, D0, PacketId=1, TopicFilters=[{<<“$share/g1/test”>>,#{nl => 0,qos => 2,rap => 0,rh => 0}}])
2023-04-24T06:10:16.177468+00:00 [info] client2@172.17.0.1:58224 client2 SUBSCRIBE test: Options: #{nl => 0,qos => 2,rap => 0,rh => 0,share => <<“g1”>>,sub_props => #{}}
2023-04-24T06:10:16.177550+00:00 [debug] client2@172.17.0.1:58224 [MQTT] SEND SUBACK(Q0, R0, D0, PacketId=1, ReasonCodes=[2])

2023-04-24T06:23:35 a发送1atest,client1和client2都收到

2023-04-24T06:23:35.964840+00:00 [debug] 172.17.0.1:58284 [MQTT] RECV <<16,13,0,4,77,81,84,84,4,2,0,60,0,1,97>>
2023-04-24T06:23:35.964953+00:00 [debug] 172.17.0.1:58284 [MQTT] RECV CONNECT(Q0, R0, D0, ClientId=a, ProtoName=MQTT, ProtoVsn=4, CleanStart=true, KeepAlive=60, Username=undefined, Password=undefined)
2023-04-24T06:23:35.965040+00:00 [debug] a@172.17.0.1:58284 [Channel] RECV CONNECT(Q0, R0, D0, ClientId=a, ProtoName=MQTT, ProtoVsn=4, CleanStart=true, KeepAlive=60, Username=undefined, Password=undefined)
2023-04-24T06:23:35.965317+00:00 [debug] a@172.17.0.1:58284 client_id: <<“a”>>, file: emqx_cm.erl, line: 128, mfa: {emqx_cm,insert_channel_info,3}, msg: insert_channel_info, pid: <0.4391.0>
2023-04-24T06:23:35.965476+00:00 [debug] a@172.17.0.1:58284 [MQTT] SEND CONNACK(Q0, R0, D0, AckFlags=0, ReasonCode=0)
2023-04-24T06:23:35.967821+00:00 [debug] a@172.17.0.1:58284 [MQTT] RECV <<52,14,0,4,116,101,115,116,0,1,49,97,116,101,115,116>>
2023-04-24T06:23:35.968006+00:00 [debug] a@172.17.0.1:58284 [MQTT] RECV PUBLISH(Q2, R0, D0, Topic=test, PacketId=1, Payload=<<“1atest”>>)
2023-04-24T06:23:35.968147+00:00 [info] a@172.17.0.1:58284 PUBLISH to test: <<“1atest”>>
2023-04-24T06:23:35.968262+00:00 [debug] client2@172.17.0.1:58224 [MQTT] SEND PUBLISH(Q2, R0, D0, Topic=test, PacketId=54, Payload=<<“1atest”>>)
2023-04-24T06:23:35.968411+00:00 [debug] a@172.17.0.1:58284 [MQTT] SEND PUBREC(Q0, R0, D0, PacketId=1, ReasonCode=0)
2023-04-24T06:23:35.968332+00:00 [debug] client1@172.17.0.1:58236 [MQTT] SEND PUBLISH(Q2, R0, D0, Topic=test, PacketId=51, Payload=<<“1atest”>>)
2023-04-24T06:23:35.970839+00:00 [debug] client1@172.17.0.1:58236 [MQTT] RECV <<80,2,0,51>>
2023-04-24T06:23:35.970841+00:00 [debug] a@172.17.0.1:58284 [MQTT] RECV <<98,2,0,1>>
2023-04-24T06:23:35.970943+00:00 [debug] a@172.17.0.1:58284 [MQTT] RECV PUBREL(Q1, R0, D0, PacketId=1, ReasonCode=0)
2023-04-24T06:23:35.970994+00:00 [debug] a@172.17.0.1:58284 [MQTT] SEND PUBCOMP(Q0, R0, D0, PacketId=1, ReasonCode=0)
2023-04-24T06:23:35.970935+00:00 [debug] client1@172.17.0.1:58236 [MQTT] RECV PUBREC(Q0, R0, D0, PacketId=51, ReasonCode=0)
2023-04-24T06:23:35.971083+00:00 [debug] client1@172.17.0.1:58236 [MQTT] SEND PUBREL(Q1, R0, D0, PacketId=51, ReasonCode=0)
2023-04-24T06:23:35.971114+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV <<80,2,0,54>>
2023-04-24T06:23:35.971208+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV PUBREC(Q0, R0, D0, PacketId=54, ReasonCode=0)
2023-04-24T06:23:35.971266+00:00 [debug] client2@172.17.0.1:58224 [MQTT] SEND PUBREL(Q1, R0, D0, PacketId=54, ReasonCode=0)
2023-04-24T06:23:35.972885+00:00 [debug] a@172.17.0.1:58284 [MQTT] RECV <<224,0>>
2023-04-24T06:23:35.973043+00:00 [debug] a@172.17.0.1:58284 [MQTT] RECV DISCONNECT(Q0, R0, D0, ReasonCode=0)
2023-04-24T06:23:35.973159+00:00 [debug] a@172.17.0.1:58284 [MQTT] Force to close the socket due to normal
2023-04-24T06:23:35.973199+00:00 [debug] client1@172.17.0.1:58236 [MQTT] RECV <<112,2,0,51>>
2023-04-24T06:23:35.973305+00:00 [debug] client1@172.17.0.1:58236 [MQTT] RECV PUBCOMP(Q0, R0, D0, PacketId=51, ReasonCode=0)
2023-04-24T06:23:35.973356+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV <<112,2,0,54>>
2023-04-24T06:23:35.973439+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV PUBCOMP(Q0, R0, D0, PacketId=54, ReasonCode=0)
2023-04-24T06:23:35.973497+00:00 [info] a@172.17.0.1:58284 file: emqx_connection.erl, line: 544, mfa: {emqx_connection,terminate,2}, msg: terminate, pid: <0.4391.0>, reason: {shutdown,normal}
2023-04-24T06:23:35.973854+00:00 [debug] client_id: <<“a”>>, file: emqx_cm.erl, line: 562, mfa: {emqx_cm,clean_down,1}, msg: emqx_cm_clean_down, pid: <0.1770.0>

2023-04-24T06:23:40 aa发送2atest,client1和client2都收到

2023-04-24T06:23:40.852821+00:00 [debug] 172.17.0.1:58288 [MQTT] RECV <<16,14,0,4,77,81,84,84,4,2,0,60,0,2,97,97>>
2023-04-24T06:23:40.852934+00:00 [debug] 172.17.0.1:58288 [MQTT] RECV CONNECT(Q0, R0, D0, ClientId=aa, ProtoName=MQTT, ProtoVsn=4, CleanStart=true, KeepAlive=60, Username=undefined, Password=undefined)
2023-04-24T06:23:40.853068+00:00 [debug] aa@172.17.0.1:58288 [Channel] RECV CONNECT(Q0, R0, D0, ClientId=aa, ProtoName=MQTT, ProtoVsn=4, CleanStart=true, KeepAlive=60, Username=undefined, Password=undefined)
2023-04-24T06:23:40.853570+00:00 [debug] aa@172.17.0.1:58288 client_id: <<“aa”>>, file: emqx_cm.erl, line: 128, mfa: {emqx_cm,insert_channel_info,3}, msg: insert_channel_info, pid: <0.4399.0>
2023-04-24T06:23:40.853739+00:00 [debug] aa@172.17.0.1:58288 [MQTT] SEND CONNACK(Q0, R0, D0, AckFlags=0, ReasonCode=0)
2023-04-24T06:23:40.855535+00:00 [debug] aa@172.17.0.1:58288 [MQTT] RECV <<52,14,0,4,116,101,115,116,0,1,50,97,116,101,115,116>>
2023-04-24T06:23:40.855628+00:00 [debug] aa@172.17.0.1:58288 [MQTT] RECV PUBLISH(Q2, R0, D0, Topic=test, PacketId=1, Payload=<<“2atest”>>)
2023-04-24T06:23:40.855821+00:00 [info] aa@172.17.0.1:58288 PUBLISH to test: <<“2atest”>>
2023-04-24T06:23:40.856113+00:00 [debug] client2@172.17.0.1:58224 [MQTT] SEND PUBLISH(Q2, R0, D0, Topic=test, PacketId=55, Payload=<<“2atest”>>)
2023-04-24T06:23:40.856265+00:00 [debug] client1@172.17.0.1:58236 [MQTT] SEND PUBLISH(Q2, R0, D0, Topic=test, PacketId=52, Payload=<<“2atest”>>)
2023-04-24T06:23:40.856357+00:00 [debug] aa@172.17.0.1:58288 [MQTT] SEND PUBREC(Q0, R0, D0, PacketId=1, ReasonCode=0)
2023-04-24T06:23:40.858314+00:00 [debug] client1@172.17.0.1:58236 [MQTT] RECV <<80,2,0,52>>
2023-04-24T06:23:40.858442+00:00 [debug] aa@172.17.0.1:58288 [MQTT] RECV <<98,2,0,1>>
2023-04-24T06:23:40.858427+00:00 [debug] client1@172.17.0.1:58236 [MQTT] RECV PUBREC(Q0, R0, D0, PacketId=52, ReasonCode=0)
2023-04-24T06:23:40.858556+00:00 [debug] aa@172.17.0.1:58288 [MQTT] RECV PUBREL(Q1, R0, D0, PacketId=1, ReasonCode=0)
2023-04-24T06:23:40.858602+00:00 [debug] client1@172.17.0.1:58236 [MQTT] SEND PUBREL(Q1, R0, D0, PacketId=52, ReasonCode=0)
2023-04-24T06:23:40.858652+00:00 [debug] aa@172.17.0.1:58288 [MQTT] SEND PUBCOMP(Q0, R0, D0, PacketId=1, ReasonCode=0)
2023-04-24T06:23:40.858879+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV <<80,2,0,55>>
2023-04-24T06:23:40.858993+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV PUBREC(Q0, R0, D0, PacketId=55, ReasonCode=0)
2023-04-24T06:23:40.859125+00:00 [debug] client2@172.17.0.1:58224 [MQTT] SEND PUBREL(Q1, R0, D0, PacketId=55, ReasonCode=0)
2023-04-24T06:23:40.860859+00:00 [debug] client1@172.17.0.1:58236 [MQTT] RECV <<112,2,0,52>>
2023-04-24T06:23:40.861106+00:00 [debug] aa@172.17.0.1:58288 [MQTT] RECV <<224,0>>
2023-04-24T06:23:40.861272+00:00 [debug] aa@172.17.0.1:58288 [MQTT] RECV DISCONNECT(Q0, R0, D0, ReasonCode=0)
2023-04-24T06:23:40.861241+00:00 [debug] client1@172.17.0.1:58236 [MQTT] RECV PUBCOMP(Q0, R0, D0, PacketId=52, ReasonCode=0)
2023-04-24T06:23:40.861390+00:00 [debug] aa@172.17.0.1:58288 [MQTT] Force to close the socket due to normal
2023-04-24T06:23:40.861582+00:00 [info] aa@172.17.0.1:58288 file: emqx_connection.erl, line: 544, mfa: {emqx_connection,terminate,2}, msg: terminate, pid: <0.4399.0>, reason: {shutdown,normal}
2023-04-24T06:23:40.862028+00:00 [debug] client_id: <<“aa”>>, file: emqx_cm.erl, line: 562, mfa: {emqx_cm,clean_down,1}, msg: emqx_cm_clean_down, pid: <0.1780.0>
2023-04-24T06:23:40.862789+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV <<112,2,0,55>>
2023-04-24T06:23:40.862937+00:00 [debug] client2@172.17.0.1:58224 [MQTT] RECV PUBCOMP(Q0, R0, D0, PacketId=55, ReasonCode=0)

这个问题还有在追吗?

好吧,使用Java程序和Python程序测试多次,没有发现这个问题。初步怀疑是使用mosquitto_pub和mosquitto_sub造成的。