环境:本地macbook M3芯片
容器:
- 5.8.7 emqx/emqx 原始配置
- 0.24.6-full emqx/nanomq
nanomq的docker-compose
version: '3.8'
services:
nanomq:
image: emqx/nanomq:0.26.4-full
container_name: nanomq
restart: always
ports:
- "1883:1883" # MQTT TCP端口
- "8083:8083" # MQTT WebSocket端口
- "8081:8081" # HTTP管理端口
- "8883:8883" # MQTT SSL端口
volumes:
- ./config/nanomq.conf:/etc/nanomq.conf
- ./config/nanomq_local.conf:/usr/local/bin/nanomq.conf
- ./logs:/var/log/nanomq
- ./data:/var/lib/nanomq
- ./tmp/bridge_cache:/tmp/bridge_cache
environment:
- NANOMQ_LOG_LEVEL=info
networks:
- nanomq-network
networks:
nanomq-network:
driver: bridge
nanomq 配置(只放了桥接部分,topic非原始topic):
bridges.mqtt.emqx1 = {
server = "mqtt-tcp://emqx:1883" # MQTT 服务器地址
proto_ver = 5 # MQTT 协议版本
clientid = "bridge_client_123" # 桥接的客户端 ID
clean_start = false # 清除会话
username = "test" # 桥接用户名
password = "password123" # 桥接密码
forwards = [ # 要转发到远端 MQTT 服务器的主题
{
remote_topic = "test1"
local_topic = "test1"
qos = 1
},
{
remote_topic = "test2"
local_topic = "test2"
qos = 1
},
{
remote_topic = "test3"
local_topic = "test3"
qos = 2
},
{
remote_topic = "test4"
local_topic = "test4"
qos = 2
},
{
remote_topic = "test5"
local_topic = "test5"
qos = 2
}
]
subscription = [ # 要从远端 MQTT 服务器订阅的主题
{
remote_topic = "test6"
local_topic = "test6"
qos = 2
}
]
keepalive = "3s" # 桥接的保活间隔时间(s)
backoff_max = "10s" # 最大重试间隔时间(s)
max_parallel_processes = 32 # 最大并行进程数
max_send_queue_len = 256 # 消息发送队列的最大长度
max_recv_queue_len = 256 # 消息接收队列的最大长度
resend_interval = 50 # 故障恢复后重发间隔(ms)
resend_wait = 65000 # 消息发布后重发等待时间(ms)
cancel_timeout = 5000 # QoS 消息确认超时时间(ms),避免 AIO 占用过久
retry_qos_0 = true
}
bridges.mqtt.cache {
disk_cache_size = 102400 # 缓存的最大消息限制
mounted_file_path="/tmp/bridge_cache/" # 挂载的文件路径
flush_mem_threshold = 100 # 刷新消息到闪存的阈值
resend_interval = 5000 # 故障恢复后消息的重发间隔
}
以5qps发送test1,其余话题均为1qps。相当于9qps的qos1+的消息量
通过以下命令控制网络通断:
#把emqx丢进nanomq的网络里
docker network connect nanomq_nanomq-network emqx
#断连
docker network disconnect nanomq_nanomq-network emqx
流程 起subscriber,起publisher,常规发送10s,docker network disconnect 断开一定时间(100s),docker network connect,继续发送20s,关闭publisher,等待60s,关闭subscriber。
断开一定时间以后观察到:ctx_msgs堆满了
nanomq | 2025-12-23 11:35:20 [21] INFO /home/runner/work/nanomq/nanomq/nng/src/mqtt/transport/tcp/mqtt_tcp.c:1436 mqtt_tcptran_dial_cb: aio result Address invalid
nanomq | 2025-12-23 11:35:20 [18] WARN /home/runner/work/nanomq/nanomq/nng/src/mqtt/transport/tcp/mqtt_tcp.c:1610 mqtt_tcptran_ep_connect: reconnect to emqx:1883 in 8688 ms
nanomq | 2025-12-23 11:35:29 [21] WARN /home/runner/work/nanomq/nanomq/nanomq/bridge.c:1861 bridge_pub_handler: Cached Message in ctx_msgs is lost!
nanomq | 2025-12-23 11:35:29 [23] WARN /home/runner/work/nanomq/nanomq/nanomq/bridge.c:1861 bridge_pub_handler: Cached Message in ctx_msgs is lost!
源码bridge.c:1845-1886左右代码如下:
if (nng_aio_busy(node->bridge_aio[index])) {
if (qos == 0) {
nng_msg_free(bridge_msg);
log_warn(
"bridging to %s aio busy! "
"msg lost! Ctx: %d drop qos 0 msg",
node->address, work->ctx.id);
} else if (node->ctx_msgs != NULL) {
// pass index of aio via timestamp;
if (nng_lmq_full(node->ctx_msgs)) {
log_warn("Cached Message in ctx_msgs is lost!");
nng_msg *tmsg;
(void) nng_lmq_get(node->ctx_msgs, &tmsg);
nng_msg_free(tmsg);
}
if (nng_lmq_put(node->ctx_msgs, bridge_msg) != 0) {
log_warn("Msg lost! put msg to ctx_msgs failed!");
nng_msg_free(bridge_msg);
}
}
} else {
nng_aio_set_timeout(node->bridge_aio[index],
node->cancel_timeout);
nng_aio_set_msg(node->bridge_aio[index], bridge_msg);
// switch to nng_ctx_send!
nng_send_aio(*socket, node->bridge_aio[index]);
}
我的疑问是,断网情况下,为啥nng_aio_busy判断为busy,然后消息堆积在了ctx_msgs
理想情况下应该是 不busy,进入nng层的nng_send_aio,然后mqtt_ctx_send s->mqtt_pipe不存在,然后因为bridge开启了sqlite,则进入offline_cache,最后满了就丢进了sqlite,然后释放这个aio吗。