EMQX5.0 delayed 延迟发布不生效

环境信息

  • EMQX 版本:5.0 开源版
  • 操作系统及版本:宿主机:macOS Monterey,Docker image:emqx/emqx:5.0.0
  • 其他

问题描述

测试 delayed 延迟发布,subscriber 收不到消息。

  1. 使用 MQTTX 1.8.2,为 x/y 主题发布消息,但是订阅了 $delayed/5/x/y 的 subscriber 在 5s 后没有收到消息:

    说明:Topic 那里不论是填写 x/y 还是 $delayed/5/x/y,都收不到消息:

    使用 $delayed/5/x/y 作为 Topic 发布时,MQTTX 的日志会出现:

delayed publish 选项已经打开:

  1. 使用 php-mqtt 测试也是一样的情况,收不到延时消息

配置文件及日志

配置 warning 级别的日志,服务端没有生成日志

推送参数和订阅参数写反了;

延迟发布机制是通过往哪个主题推送消息的主题前面加上 “$delayed” 关键字再加上延迟时间来触发延迟发布机制的。

推送格式:
$delayed/5/x/y;
订阅格式:
x/y;

1 个赞

见文档 延迟发布

如果是这样配置,似乎没有延迟,消息是立即收到的

我这里无法复现,发布和收到消息中间差 10s,是正常的。
你用的具体是哪个小版本呢

昨天测试的 EMQX5.0,用的是 docker 镜像:emqx/emqx:5.0.0。今天用 EMQX4.4.6(emqx/emqx:4.4.6)测试:


跟你截图测试方法一样,但是消息是立即收到的,没有延迟,不知道是不是我哪里操作或者设置有问题

emqx_mod_delayed 已经开启了:

请问下你用的是哪个版本,我也试一下

v5.0.4 和 v4.4.6 都可以的

也是用的 emqx docker 镜像吗?我现在可以怎样继续排查呢?

开下 debug 日志看看呢?

我修改了一下 emqx.conf,把 log.level 修改成 debug,然后重启容器,发现正常了可以收到延迟消息了…
后来再重新把 emqx.conf 还原,重起容器后发现也正常了,也可以收到延迟的消息,只不过都没有生成日志文件(把 log.to 设置成功 both 能在 console 中看到日志,但是没有生成文件日志)。

之前通过 docker-compose 构建容器,构建成功后直接使用 emqx5.0.0 和 emqx4.4.6 服务,延迟发布都有问题,所以说难道构建完后还要重启下服务吗。

我也再继续观察下,谢谢各位。

看起来我又遇到这个问题了,MQTTX 1.8.2 在一段时间没有使用后,重新连接 broker,又重现了立即收到延迟的消息,debug 日志显示:

2022-08-19T17:00:18.834402+00:00 [debug] mqttx_e2d00d84@172.21.0.1:59376 [MQTT] RECV <<50,64,0,14,36,100,101,108,97,121,101,100,47,53,47,120,47,121,141,62,0,123,10,32,32,34,109,115,103,34,58,32,34,104,101,108,108,111,34,44,10,32,32,34,116,111,34,58,32,34,100,101,108,97,121,101,100,32,116,111,112,105,99,34,10,125>>
2022-08-19T17:00:18.834741+00:00 [debug] mqttx_e2d00d84@172.21.0.1:59376 [MQTT] RECV PUBLISH(Q1, R0, D0Topic=$delayed/5/x/y, PacketId=36158, Payload={, "msg": "hello", "to": "delayed topic", })
2022-08-19T17:00:18.835053+00:00 [info] mqttx_e2d00d84@172.21.0.1:59376 PUBLISH to $delayed/5/x/y: <<"{\n  \"msg\": \"hello\",\n  \"to\": \"delayed topic\"\n}">>
2022-08-19T17:00:18.835472+00:00 [notice] mqttx_e2d00d84@172.21.0.1:59376 [Broker] Stop publishing: Message(Id=0005E69AD0E9758F398900000CC70002, QoS=1, Topic=$delayed/5/x/y, From=<<"mqttx_e2d00d84">>, Flags=[], Headers=#{peerhost => {172,21,0,1}, properties => #{},proto_ver => 5,protocol => mqtt, username => <<"emqx_user">>})
2022-08-19T17:00:18.837385+00:00 [debug] mqttx_e2d00d84@172.21.0.1:59376 [MQTT] SEND PUBACK(Q0, R0, D0PacketId=36158, ReasonCode=16)
2022-08-19T17:00:19.827693+00:00 [info] PUBLISH to x/y: <<"{\n  \"msg\": \"hello\",\n  \"to\": \"delayed topic\"\n}">>
2022-08-19T17:00:19.828016+00:00 [debug] mqttx_e2d00d84@172.21.0.1:59376 [MQTT] SEND PUBLISH(Q0, R0, D0Topic=x/y, PacketId=undefined, Payload={, "msg": "hello", "to": "delayed topic", })

在重启 EMQX(4.4.6)后延迟发布正常,此时的日志:

2022-08-19T17:02:23.844454+00:00 [debug] mqttx_e2d00d84@172.21.0.1:59406 [MQTT] RECV <<50,64,0,14,36,100,101,108,97,121,101,100,47,53,47,120,47,121,117,66,0,123,10,32,32,34,109,115,103,34,58,32,34,104,101,108,108,111,34,44,10,32,32,34,116,111,34,58,32,34,100,101,108,97,121,101,100,32,116,111,112,105,99,34,10,125>>
2022-08-19T17:02:23.844728+00:00 [debug] mqttx_e2d00d84@172.21.0.1:59406 [MQTT] RECV PUBLISH(Q1, R0, D0Topic=$delayed/5/x/y, PacketId=30018, Payload={, "msg": "hello", "to": "delayed topic", })
2022-08-19T17:02:23.845606+00:00 [info] mqttx_e2d00d84@172.21.0.1:59406 PUBLISH to $delayed/5/x/y: <<"{\n  \"msg\": \"hello\",\n  \"to\": \"delayed topic\"\n}">>
2022-08-19T17:02:23.847143+00:00 [notice] mqttx_e2d00d84@172.21.0.1:59406 [Broker] Stop publishing: Message(Id=0005E69B0FDA6BA839880000081E0001, QoS=1, Topic=$delayed/5/x/y, From=<<"mqttx_e2d00d84">>, Flags=[], Headers=#{peerhost => {172,21,0,1}, properties => #{},proto_ver => 5,protocol => mqtt, username => <<"emqx_user">>})
2022-08-19T17:02:23.847790+00:00 [debug] mqttx_e2d00d84@172.21.0.1:59406 [MQTT] SEND PUBACK(Q0, R0, D0PacketId=30018, ReasonCode=16)
2022-08-19T17:02:29.849144+00:00 [info] PUBLISH to x/y: <<"{\n  \"msg\": \"hello\",\n  \"to\": \"delayed topic\"\n}">>
2022-08-19T17:02:29.849712+00:00 [debug] mqttx_e2d00d84@172.21.0.1:59406 [MQTT] SEND PUBLISH(Q0, R0, D0Topic=x/y, PacketId=undefined, Payload={, "msg": "hello", "to": "delayed topic", })