飞行窗口的疑问

docker v6.1.0 ,开启了持久化 ,没有修改重试时间 retry_interval


node {
  name = "emqx@127.0.0.1"
  cookie = "emqx50elixir"
  data_dir = "data"
}

cluster {
  name = emqxcl
  discovery_strategy = manual
}
telemetry.enable = false
durable_sessions {
  enable = true
}
dashboard {
    listeners {
        http.bind = 18083
        # https.bind = 18084
        https {
            ssl_options {
                certfile = "${EMQX_ETC_DIR}/certs/cert.pem"
                keyfile = "${EMQX_ETC_DIR}/certs/key.pem"
            }
        }
    }
}

问题:
客户端处理消息失败 , emqx 存在客户端的飞行窗口中.
什么时候emqx会再次推送给客户端?

之前用V5.x版本是很快会再次推送的.

发现飞行窗口一直在增加.

【线程ID:156】【2026-02-27 16:17:37,426】 【INFO】 【消息:】 >>>SyncOrderCreateReceive start
【线程ID:156】【2026-02-27 16:17:37,442】 【INFO】 【消息:】 >>>SyncOrderCreateReceive finish isOk=False
【线程ID:156】【2026-02-27 16:17:37,457】 【INFO】 【消息:】 >> [2026-02-27T08:17:37.4578358Z]  [MqttClient] [Error] [MQTTLogger]: Error while handling application message
System.Exception: 消息处理失败,等待再次推送
   在 System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   在 MQTTnet.Internal.AsyncEvent`1.<InvokeAsync>d__9.MoveNext()
--- 引发异常的上一位置中堆栈跟踪的末尾 ---
   在 System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   在 System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   在 MQTTnet.Client.MqttClient.<HandleReceivedApplicationMessage>d__59.MoveNext()
--- 引发异常的上一位置中堆栈跟踪的末尾 ---
   在 System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   在 MQTTnet.Client.MqttClient.<ProcessReceivedPublishPackets>d__63.MoveNext()

【线程ID:66】【2026-02-27 16:18:28,561】 【INFO】 【消息:】 >> [2026-02-27T08:18:28.5618068Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: TX (2 bytes) >>> PingReq

【线程ID:43】【2026-02-27 16:18:28,577】 【INFO】 【消息:】 >> [2026-02-27T08:18:28.5774577Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: RX (2 bytes) <<< PingResp

【线程ID:73】【2026-02-27 16:18:29,079】 【INFO】 【消息:】 >> [2026-02-27T08:18:29.0794398Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: TX (2 bytes) >>> PingReq

【线程ID:43】【2026-02-27 16:18:29,117】 【INFO】 【消息:】 >> [2026-02-27T08:18:29.1172535Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: RX (2 bytes) <<< PingResp

【线程ID:73】【2026-02-27 16:19:28,826】 【INFO】 【消息:】 >> [2026-02-27T08:19:28.8264708Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: TX (2 bytes) >>> PingReq

【线程ID:43】【2026-02-27 16:19:28,842】 【INFO】 【消息:】 >> [2026-02-27T08:19:28.8420877Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: RX (2 bytes) <<< PingResp

【线程ID:136】【2026-02-27 16:19:29,092】 【INFO】 【消息:】 >> [2026-02-27T08:19:29.0920553Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: TX (2 bytes) >>> PingReq

【线程ID:43】【2026-02-27 16:19:29,106】 【INFO】 【消息:】 >> [2026-02-27T08:19:29.1067917Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: RX (2 bytes) <<< PingResp

【线程ID:136】【2026-02-27 16:20:28,926】 【INFO】 【消息:】 >> [2026-02-27T08:20:28.9265713Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: TX (2 bytes) >>> PingReq

【线程ID:43】【2026-02-27 16:20:28,942】 【INFO】 【消息:】 >> [2026-02-27T08:20:28.9421775Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: RX (2 bytes) <<< PingResp

【线程ID:109】【2026-02-27 16:20:29,178】 【INFO】 【消息:】 >> [2026-02-27T08:20:29.1780304Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: TX (2 bytes) >>> PingReq

【线程ID:43】【2026-02-27 16:20:29,193】 【INFO】 【消息:】 >> [2026-02-27T08:20:29.1936534Z]  [MqttChannelAdapter] [Verbose] [MQTTLogger]: RX (2 bytes) <<< PingResp```

不会立刻重推。飞行窗口就是 QoS1/2 已发送未确认的 inflight,客户端不 ACK,EMQX 只会按 mqtt.retry_interval 周期重投(带 DUP),或者等客户端断开重连后再投递。默认是 30s。
你现在看到 inflight 一直涨,说明客户端一直在线但没把 ACK 发回来。窗口涨到 mqtt.max_inflight 后,新消息就进 mqueue 了。
先做两件事:

  1. 客户端只在处理成功后 ACK。失败就不要 ACK,并主动断开让它重连,否则只会堆 inflight。
  2. 明确你的重投间隔和窗口上限:
mqtt {
  retry_interval = 30s
  max_inflight = 32
  max_mqueue_len = 1000
}

补充信息我才好定位:

  • 订阅的 QoS 等级?MQTT 3.1.1 还是 5?
  • MQTTnet 版本、是否启用手动 ACK(成功才 ACK)?
  • 现在 mqtt.retry_interval/max_inflight/max_mqueue_len 实际值是多少(贴 emqx.conf 或环境变量)?

v6.1.0 有消息重试间隔配置. 我使用了默认infinity. 现在改成了30s . 保存. 未重启docker

之前飞行窗口里的数据,30s之后是否会重新发送.
QOS 1. mqtt 5
emqx.conf 最上面已经贴了

  1. 改完之后, 发现飞行窗口30s没有再次发送 .需要重启docker 才生效吗?
  2. 有1条消息未返回ack . 堆飞行窗口 . 一段时间有5条消息流入broker, 有2条 未推送客户端,直接堆飞行窗口, 3条推送并消费成功.

这是什么逻辑?使用上有问题吗

这看起来像是 6.x 的 bug。
有完整的 emqx debug 日志么,麻烦提供一下。
我找人复现研究下。

我是生产环境下, 一直监控客户端 消费消息情况. 发现的问题。
比如
A 客户端发送topic ,有记录; 再看客户端B 有飞行窗口堆积.
去B客户端查找是否收到 A的topic.

怎么获取emqx debug 日志?

用 CLI 开 debug 日志就行,Docker 里直接 exec。
临时把日志拉到 debug(立刻生效):
docker exec -it CONTAINER emqx ctl log set-level debug
docker exec -it CONTAINER emqx ctl log handlers set-level console debug
看日志(控制台或文件二选一):
docker logs -f CONTAINER
# 或者
docker exec -it CONTAINER tail -f /opt/emqx/log/emqx.log
只盯 B 客户端就用 trace:
docker exec -it CONTAINER emqx ctl trace start client CLIENT_ID trace.log debug
docker exec -it CONTAINER tail -f /opt/emqx/log/trace.log
# 结束后
docker exec -it CONTAINER emqx ctl trace stop client CLIENT_ID
抓完恢复:
docker exec -it CONTAINER emqx ctl log set-level info
docker exec -it CONTAINER emqx ctl log handlers set-level console info
贴问题时间点前后 1~2 分钟日志,带 B 的 clientId 和时间戳。

生产环境如果流量比较大,开 debug 也不合适,我明天看看能不能复现吧。

我现在在人工补偿,求解决重试问题 :rofl:

还在找问题,没啥办法,要不就降级到 v5

测试环境没开会话持久化, 是能重新发送的 :rofl: 有延迟发布 不好降级.

取消持久化,重启docker , 延迟发布的消息还在不??
反馈的2个问题都是 开启了会话持久化 :rofl:

retry_interval 改成 30s 后,通常不会“追溯处理”你改配置前已经卡住的 inflight,会话重连后最容易观察到新配置效果。
你现在这个现象(开了 durable session 后,inflight 堆积、重推不符合预期)和你配置的 retry_interval = infinity 是对得上的:从 5.8 开始默认就是 infinity,默认不做周期重投。你改成 30s 后,建议先让问题客户端断开并重连,再观察新进入 inflight 的消息是否按 30s 重投。
你问的两个点:

  • 要不要重启 Docker:优先先看生效值,别猜。直接执行:
docker exec -it CONTAINER emqx ctl conf show mqtt.retry_interval

如果输出不是 30s,再重启容器;如果已经是 30s,就重点看“老 inflight 不追溯 + 会话未重建”这个路径。

  • 关掉 durable_sessions 后,延迟发布消息还在不在
    • 延迟发布($delayed/...)和会话持久化是两条链路,不是同一个开关。
    • 只要 data_dir 还在(Docker 挂了持久卷),重启后延迟发布数据通常还在。
    • 但 durable session 的离线会话/排队消息行为会变,尤其是你关掉持久会话后,原来那批持久会话状态不要按“还能恢复”来预期。
      你现在在生产补偿,先用这个临时止血策略:
  1. mqtt.retry_interval 固定成 30s(确认已生效)。
  2. 对异常客户端做一次有序重连(让新会话参数生效)。
  3. 客户端处理失败时不要长期挂在线不 ACK,直接断开重连触发重投路径。
    并行做一个最小复现(单发布端 + 单订阅端 + 固定 QoS1 + durable on/off 对照),我这边可以继续跟你对日志对齐是否命中 6.1 的已知缺陷路径。

emqx.conf 配置里没有 retry_interval. 所以没输出.
Emqx 的mqtt配置 设置了30s 这两应该是同一个意思吧.

我正在用测试环境测试 关掉 durable_sessions的情况
感谢!!!

mqtt {
  await_rel_timeout = "300s"
  client_attrs_init = []
  clientid_override = disabled
  exclusive_subscription = true
  idle_timeout = "15s"
  ignore_loop_deliver = false
  keepalive_check_interval = "30s"
  keepalive_multiplier = 1.5
  max_awaiting_rel = 100
  max_clientid_len = 65535
  max_inflight = 32
  max_mqueue_len = infinity
  max_packet_size = "1MB"
  max_qos_allowed = 2
  max_subscriptions = infinity
  max_topic_alias = 65535
  max_topic_levels = 128
  message_expiry_interval = infinity
  mqueue_default_priority = lowest
  mqueue_priorities = disabled
  mqueue_store_qos0 = true
  namespace_as_mountpoint = false
  peer_cert_as_clientid = disabled
  peer_cert_as_username = disabled
  response_information = ""
  retain_available = true
  retry_interval = "30s"
  server_keepalive = disabled
  session_expiry_interval = "65535h"
  shared_subscription = true
  shared_subscription_initial_sticky_pick = random
  shared_subscription_strategy = round_robin
  strict_mode = false
  upgrade_qos = false
  use_username_as_clientid = false
  wildcard_subscription = true
}

是30s.

是同一个配置项:你在 Dashboard 里改的 MQTT 配置本质上就是 mqtt.retry_interval