规则中设置 消息重发布失效

环境

  • EMQX 版本:5.3.0
  • 操作系统版本:Debian GNU/Linux 11 (bullseye)

重现此问题的步骤

  1. 规则中SQL编辑器设置如下
SELECT
  *
FROM
  "$events/message_delivered",
  "$events/message_acked",
  "$events/message_dropped",
  "$events/delivery_dropped"
WHERE topic != 'client_action'

重发布设置如下:

预期行为

当订阅message_action时,应该收到所有重发布的消息。

实际行为

在对消息条数进行统计时发现,有部分消息没有进行重发布,6000余条需要重发布的消息中有246条未进行发布。

数据流
桥接器将作为客户端登录到emqx并保持连接,同时订阅某个rocketmq的topic,mq下发的消息体中带有payload和topic,桥接器模块将逐条进行处理,将每一条mq中的消息丢到对应的topic中,在这个过程中,根据上述配置的重发布规则,桥接器模块每次转发将产生一些事件被丢到message_action topic中。

对于成功重发布的消息所产生的日志如下:

{
    "message": "2024-05-16T11:13:11.630319+00:00 [error] msg: recursive_republish_detected, mfa: emqx_rule_actions:republish/3(111), peername: xx.xxx.x.x:xxxx, clientid: consumer-biz001_1713174988356, topic: message_action\n[rule action] message_action_xxxxx\n\tAction Data: #{clientid => <<\"consumer-biz001_1713174988356\">>,\n event => 'message.acked',\n flags => #{dup => false,retain => false},\n from_clientid => <<\"message_action_xxxxx\">>,\n from_username => undefined,\n id => <<\"000618905213742F9BE53B023A7F0005\">>,\n metadata => #{rule_id => <<\"message_action_xxxxx\">>},\n node =>\n 'emqx@emqx-2.emqx-headless.mqttmp.svc.cluster.local',\n payload =>\n <<xxxxxxx,\n xxxxxxxxxxxxx,\n xxxxxx"
}

没有重发布的消息的时间段产生的日志如下:

{
    "message": "2024-05-16T13:40:17.956800+00:00 [info] msg: terminate, mfa: emqx_connection:terminate/2(673), peername: xx.xxx.x.x:xxxx, clientid: 1491, reason: {shutdown,not_authorized}"
}
{
    "message": "2024-05-16T13:40:20.413679+00:00 [info] msg: authorization_permission_allowed, mfa: emqx_authz:log_allowed/1(438), peername: xx.xxx.x.x:xxxx, clientid: bridge-adapter_1715864958418, topic: XXXXX/CMD/154d4a4697a5efaa7dc2fc041df51ce3, ipaddr: {xx.xxx.x.x:xxxx}, source: file, username: <<\"bridge-adapter\">>"
}
{
    "message": "2024-05-16T13:40:21.102351+00:00 [info] msg: terminate, mfa: emqx_connection:terminate/2(673), peername:xx.xxx.x.x:xxxx, clientid: 55, reason: {shutdown,tcp_closed}"
}
{
    "message": "2024-05-16T13:40:21.644536+00:00 [info] msg: authorization_permission_allowed, mfa: emqx_authz:log_allowed/1(438), peername: xx.xxx.x.x:xxxx, clientid: 55, topic: XXXXX/CMD/2c8fbb552593327450ce12d28773462e, ipaddr: {xx.xxx.x.x:xxxx}, source: file, username: <<\"XXXXXXXXXXXXXXXXXXXXXX\">>"
}

其中的2024-05-16T13:40:20.413679+00:00 这条,桥接器模块日志显示,收到了一条业务mq,并进行处理后丢到了对应的emqx topic中,另一个订阅此topic的mqtt client也成功的收到了这条消息,但是并没有进行重发布到message_action,请问能帮忙分析下为什么没有进行重发布吗?以及 msg: authorization_permission_allowed, mfa: emqx_authz:log_allowed/1(438)这个msg表示什么含义?

还有两个需要咨询的问题是规则的dashboard中
有几类指标:命中,通过,失败,无结果

为什么无结果全部集中在最后一个节点?
无结果在emqx中表示什么含义?

对于成功重发布的消息所产生的日志如下:

其中的 msg: recursive_republish_detected 表示你的规则也会把这条重发布给查询到,从而形成了循环发布。建议该下规则和republish 的主题。

没有重发布的消息的时间段产生的日志如下:

其中,有俩条 msg: terminate 都是表示客户端断开链接了:

  • reason: {shutdown,not_authorized} 表示认证失败了,需要检查下客户端认证是否正确
  • reason: {shutdown,tcp_closed} 这种比较难讲;一般来说是由于 socket 突然关了导致的,需要排查下客户端、和中间的负载均衡是否存在这样的可能

msg: authorization_permission_allowed 这个是指消息发布授权成功,没关系的。

建议先把循环发布解决了,然后在测试试试看

感谢您的解答,我打算将规则改为

SELECT
  *
FROM
  "$events/message_delivered",
  "$events/message_acked",
  "$events/message_dropped",
  "$events/delivery_dropped"
WHERE topic !='message_action' or topic != 'client_action' 

来尝试解决循环发布的问题。

但是我觉得没有解决最关键的问题,有部分消息没有进行重发布

因为形成循环发布的前提条件是进行了重发布,成功重发布的消息产生的error日志表明确实进行了重发布且造成了循环发布,没有重发布的消息的时间段没有这个error日志,说明没有进行重发布,但是消息确实投递到了客户端,按理说会产生一个message.delieved事件触发这个重发布规则。

请问有什么可能的原因造成这种现象嘛?
排除设备未上线等情况,异常时段的日志里的连接断开和认证失败日志是别的客户端的日志,理论上来说不会影响broker自身的消息重发布机制。

经过一天的测试,循环发布的问题好像无法得到解决
不管通过什么筛选条件来过滤,规则本身是无法分辨是第一次重发布还是第二次重发布的。
比如下面是message.delivered的事件报文,username、topic、qos、timestamp、payload、node、id、from_username、from_clientid、flags、event、clienid等字段都是会随时变化的,无法用来做过滤,唯一一个可用的就是metadata.rule_id可以用,但是如果用这个来过滤的话,那就一次重转发都不会做。

{
  "username": "bridge-receiver2",
  "topic": "mocktopic/1",
  "timestamp": 1716432928875,
  "qos": 0,
  "payload": "{\n  \"msg\": \"kkkkexamplekk\"\n}",
  "node": "emqx@emqx-1.emqx-headless.mqtt.svc.cluster.local",
  "metadata": {
    "rule_id": "message_action_test"
  },
  "id": "000619162F0198279BE5F3007C830007",
  "from_username": "bridge-publisher1",
  "from_clientid": "4",
  "flags": {
    "retain": false,
    "dup": false
  },
  "event": "message.delivered",
  "clientid": "5"
}

也尝试过在报文里加一些flag来标记是第几次重转发,但是payload中并不会进行数学处理,republish_flag只能拿到一个字符串“0 + 1”

目前看只要规则中配置了message_delivered事件,就一定会触发[error] msg: recursive_republish_detected的报错。
请问还有其他解决方法吗?

问题原因:
客户端下线后,处于会话保持时间内的消息投递不会产生任何事件。
会话保持过期后,消息就消失在emqx内部了,不会再产生事件。