nanomq突然断网情况下,消息堆积在ctx_msgs,而非预想的进入offline_cache被持久化进sqlite

环境:本地macbook M3芯片
容器:

  • 5.8.7 emqx/emqx 原始配置
  • 0.24.6-full emqx/nanomq

nanomq的docker-compose

version: '3.8'

services:
  nanomq:
    image: emqx/nanomq:0.26.4-full
    container_name: nanomq
    restart: always
    ports:
      - "1883:1883"          # MQTT TCP端口
      - "8083:8083"          # MQTT WebSocket端口
      - "8081:8081"          # HTTP管理端口
      - "8883:8883"          # MQTT SSL端口
    volumes:
      - ./config/nanomq.conf:/etc/nanomq.conf
      - ./config/nanomq_local.conf:/usr/local/bin/nanomq.conf
      - ./logs:/var/log/nanomq
      - ./data:/var/lib/nanomq
      - ./tmp/bridge_cache:/tmp/bridge_cache
    environment:
      - NANOMQ_LOG_LEVEL=info
    networks:
      - nanomq-network

networks:
  nanomq-network:
    driver: bridge

nanomq 配置(只放了桥接部分,topic非原始topic):

bridges.mqtt.emqx1 = {
    server = "mqtt-tcp://emqx:1883"  # MQTT 服务器地址
    proto_ver = 5                                        # MQTT 协议版本
    clientid = "bridge_client_123"                       # 桥接的客户端 ID
    clean_start = false                                   # 清除会话
    username = "test"                                     # 桥接用户名
    password = "password123"                               # 桥接密码
  

    forwards = [  # 要转发到远端 MQTT 服务器的主题
        {
            remote_topic = "test1"
            local_topic = "test1"
            qos = 1
        },
        {
            remote_topic = "test2"
            local_topic = "test2"
            qos = 1
        },
        {
            remote_topic = "test3"
            local_topic = "test3"
            qos = 2
        },
        {
            remote_topic = "test4"
            local_topic = "test4"
            qos = 2
        },
        {
            remote_topic = "test5"
            local_topic = "test5"
            qos = 2
        }
    ]
    subscription = [  # 要从远端 MQTT 服务器订阅的主题
        {
            remote_topic = "test6"
            local_topic = "test6"
            qos = 2
        }
    ]

    keepalive = "3s"                                     # 桥接的保活间隔时间(s)
    backoff_max = "10s"                                   # 最大重试间隔时间(s)

    max_parallel_processes = 32  # 最大并行进程数
    max_send_queue_len = 256   # 消息发送队列的最大长度
    max_recv_queue_len = 256    # 消息接收队列的最大长度

    resend_interval = 50         # 故障恢复后重发间隔(ms)
    resend_wait = 65000            # 消息发布后重发等待时间(ms)
    cancel_timeout = 5000          # QoS 消息确认超时时间(ms),避免 AIO 占用过久

    retry_qos_0 = true
}

bridges.mqtt.cache {
    disk_cache_size = 102400   # 缓存的最大消息限制
    mounted_file_path="/tmp/bridge_cache/"  # 挂载的文件路径
    flush_mem_threshold = 100  # 刷新消息到闪存的阈值
    resend_interval = 5000     # 故障恢复后消息的重发间隔
}

以5qps发送test1,其余话题均为1qps。相当于9qps的qos1+的消息量

通过以下命令控制网络通断:

#把emqx丢进nanomq的网络里
docker network connect nanomq_nanomq-network emqx

#断连
docker network disconnect nanomq_nanomq-network emqx

流程 起subscriber,起publisher,常规发送10s,docker network disconnect 断开一定时间(100s),docker network connect,继续发送20s,关闭publisher,等待60s,关闭subscriber。

断开一定时间以后观察到:ctx_msgs堆满了

nanomq  | 2025-12-23 11:35:20 [21] INFO  /home/runner/work/nanomq/nanomq/nng/src/mqtt/transport/tcp/mqtt_tcp.c:1436 mqtt_tcptran_dial_cb: aio result Address invalid
nanomq  | 2025-12-23 11:35:20 [18] WARN  /home/runner/work/nanomq/nanomq/nng/src/mqtt/transport/tcp/mqtt_tcp.c:1610 mqtt_tcptran_ep_connect: reconnect to emqx:1883 in 8688 ms
nanomq  | 2025-12-23 11:35:29 [21] WARN  /home/runner/work/nanomq/nanomq/nanomq/bridge.c:1861 bridge_pub_handler: Cached Message in ctx_msgs is lost!
nanomq  | 2025-12-23 11:35:29 [23] WARN  /home/runner/work/nanomq/nanomq/nanomq/bridge.c:1861 bridge_pub_handler: Cached Message in ctx_msgs is lost!

源码bridge.c:1845-1886左右代码如下:

if (nng_aio_busy(node->bridge_aio[index])) {
	if (qos == 0) {
		nng_msg_free(bridge_msg);
		log_warn(
			"bridging to %s aio busy! "
			"msg lost! Ctx: %d drop qos 0 msg",
			node->address, work->ctx.id);
	} else if (node->ctx_msgs != NULL) {
		// pass index of aio via timestamp;
		if (nng_lmq_full(node->ctx_msgs)) {
			log_warn("Cached Message in ctx_msgs is lost!");
			nng_msg *tmsg;
			(void) nng_lmq_get(node->ctx_msgs, &tmsg);
			nng_msg_free(tmsg);
		}
		if (nng_lmq_put(node->ctx_msgs, bridge_msg) != 0) {
			log_warn("Msg lost! put msg to ctx_msgs failed!");
			nng_msg_free(bridge_msg);
		}
	}
} else {
	nng_aio_set_timeout(node->bridge_aio[index],
						node->cancel_timeout);
	nng_aio_set_msg(node->bridge_aio[index], bridge_msg);
	// switch to nng_ctx_send!
	nng_send_aio(*socket, node->bridge_aio[index]);
}

我的疑问是,断网情况下,为啥nng_aio_busy判断为busy,然后消息堆积在了ctx_msgs

理想情况下应该是 不busy,进入nng层的nng_send_aio,然后mqtt_ctx_send s->mqtt_pipe不存在,然后因为bridge开启了sqlite,则进入offline_cache,最后满了就丢进了sqlite,然后释放这个aio吗。