使用共享主题 和 非共享主题, 收到的消息数量不变. '共享主题未起到作用'

版本: v5.1.3

大体逻辑:
iot设备数在 1.7K左右, 每个设备会向 dev/monitor/${设备编号} 发消息.
平台端使用 java-mqtt-client 对 dev/monitor/+ 进行订阅, 收到消息后, 就转发到 kafka. 订阅本身没有过多业务逻辑

使用 非共享主题订阅时, 每分钟收到的消息量在 1.2W 到 1.4W之间.
使用 三个容器进行共享主题订阅, 每个容器的数量在 4K左右. 接收总量还是在 1.2W到1.4W之间, 没有增加.

在两种订阅时, 都能观察到mqtt客户端的 消息队列为 1000/1000, 且 ‘消息流出丢弃数’ 大于0.
从 emq 观察, 发现到: 因为消费缓慢导致client会被强制断开,重连.

java-mqtt-client 与 kafka 及 emq 在同一个局域网(网络不是问题), 且服务器负载未超过 cpu核数.

请指导一下.

你看看 emqx 有什么异常日志不?

然后还可以把传kafka 的代码给注释掉跑一下看看有没有变化 。

emqx 有什么异常日志不
因为消费缓慢导致client会被强制断开,重连.

传kafka 的代码给注释掉跑一下看看有没有变化 。
无变化.

导致哪个客户端断开重连,如果是在发布端(就丢消息了),共享订阅增强订阅端也没用的。


能看到消息流出丢弃数(队列已满)
这是 消息消费端.

发布端会丢消息吗? 它只有发布不成功吧

会,比如说没有人订阅这种最简单的情况。
按你的说法单个客户端能消费一万二,三个客户端每个只消费4000,还是队列满,这个确实很奇怪 我也不知道什么原因

发布端丢消息, 就是 消息流入丢弃数 吧.

发生了异常. 读取失败.

2025-12-29 08:55:33 [scheduling-1] INFO c.q.m.service.MqttToKafkaService - ==========================================
2025-12-29 08:55:48 [biz-worker-21] WARN c.q.m.service.MqttToKafkaService - MQTT connection disconnected, remark: 读数据时发生异常: java.io.IOException
2025-12-29 08:55:48 [tio-worker-3] INFO org.tio.core.task.CloseRunnable - TioClientConfig [name=Mica-Mqtt-Client], server:172.10.1.9:1883, client:172.18.0.11:45749 准备关闭连接, isNeedRemove:false, 读数据时发生异常: java.io.IOException
2025-12-29 08:55:48 [tio-worker-3] INFO org.tio.core.ChannelContext - 关闭前server:172.10.1.9:1883, client:172.18.0.11:45749, 关闭后server:172.10.1.9:1883, client:$UNKNOWN:1
2025-12-29 08:55:53 [tio-group-2] INFO o.t.c.ConnectionCompletionHandler - connected to 172.10.1.9:1883
2025-12-29 08:55:53 [DefaultTimerTaskService] ERROR org.tio.client.task.ClientReConnTask - server:172.10.1.9:1883, client:172.18.0.11:36969, 第1次重连,重连耗时:1 ms
2025-12-29 08:55:53 [tio-group-2] INFO o.d.m.m.c.c.MqttClientAioListener - MqttClient reconnect send connect result:true
2025-12-29 08:55:54 [tio-group-1] INFO o.d.m.m.c.c.DefaultMqttClientProcessor - MqttClient contextId:mqtt2kafka-client-4410945490694820 connection:172.10.1.9:1883 succeeded!
2025-12-29 08:55:54 [biz-worker-30] INFO c.q.m.service.MqttToKafkaService - MQTT reconnected successfully
2025-12-29 08:55:54 [tio-group-1] INFO o.d.m.m.c.c.DefaultMqttClientProcessor - MQTT subscriptionList:[MqttClientSubscription{topicFilter=‘$share/mqtt2kafka/dev/monitor/#’, option=SubscriptionOption[qos=QoS0, noLocal=false, retainAsPublished=false, retainHandli

2025-12-29T17:03:15.848867+08:00 [warning] msg: dropped_msg_due_to_mqueue_is_full, mfa: emqx_session:handle_dropped/3, line: 644, peername: 172.10.1.7:36969, clientid: mqtt2kafka-client-4410945490694820, topic: dev/monitor/301323120124, payload: #{extra => ,flags => #{dup => false,retain => false},from => <<“301323120124-0ed5d6e6”>>,headers => #{peerhost => {172,10,1,7},properties => #{},proto_ver => 4,protocol => mqtt,redispatch_to => {<<“mqtt2kafka”>>,<<“dev/monitor/#”>>},username => <<“xxxxxx”>>},id => <<0,6,71,19,119,140,35,71,67,98,113,0,26,27,18,35>>,qos => 1,timestamp => 1766998795887,topic => <<“dev/monitor/301323120124”>>}, queue: #{dropped => 1502,len => 1000,max_len => 1000,store_qos0 => true}

2025-12-29T17:03:24.288969+08:00 [SOCKET] mqtt2kafka-client-liuwei-4410945490694820@172.10.1.7:36969 msg: socket_force_closed, reason: keepalive_timeout
2025-12-29T17:03:24.302751+08:00 [SOCKET] mqtt2kafka-client-liuwei-4410945490694820@172.10.1.7:36969 msg: emqx_connection_terminated, reason: {shutdown,keepalive_timeout}

虽然我看不懂你客户端的报错,
但我个人经验来看就是你的客户端有问题,keepalive都超时了,按理说,你在压测过程中 这条链路上肯定是一直都有MQTT包,只要有这个包发出去了 就绝对不会keepalive timeout,这个信号就说明他在断线之前有一段时间一个包都没发出去。
如果觉得不是这个问题,建议还是用 tcpdump 验证一下以上猜想(虽然我觉得八九不离十)