只发布了一次,规则却被命中了几十万次

环境信息

  • EMQX 版本:5.0
  • 操作系统及版本:Ubuntu 22.04.1 LTS
  • 其他

问题描述

我使用MQTTX在客户端推送消息到主题phone/el,通过规则的的消息重发动作,将消息重发到主题phone/1,MQTTX订阅phone/1主题却收到了几十万条数据,这是为什么啊?

配置文件及日志

2022-08-15T17:27:30.436401+08:00 [error] line: 26, mfa: emqx_rule_api_schema:check_params/2, msg: check_rule_params_failed, reason: {emqx_rule_api_schema,[#{kind => validation_error,mismatches => #{<<"ctx_acked">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clientid,from_clientid...">>},<<"ctx_bridge_mqtt">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"dup,id...">>},<<"ctx_check_authz_complete">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"action,authz_source...">>},<<"ctx_connack">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clean_start,clientid...">>},<<"ctx_connected">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clean_start,clientid...">>},<<"ctx_delivered">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clientid,from_clientid...">>},<<"ctx_disconnected">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clientid,disconnected_at...">>},<<"ctx_dropped">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clientid,id...">>},<<"ctx_pub">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clientid,id...">>},<<"ctx_sub">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clientid,payload...">>},<<"ctx_unsub">> => #{kind => validation_error,path => "rule_test.context",reason => unknown_fields,unknown => <<"sensors">>,unmatched => <<"clientid,payload...">>}},path => "rule_test.context",reason => matched_no_union_member}]}
2022-08-15T17:29:15.220391+08:00 [warning] entrypoint: <<"emqx:update_config/3">>, kind: initiate, line: 492, mfa: emqx_cluster_rpc:log_and_alarm/3, msg: cluster_rpc_apply_result, result: {error,{post_config_update,emqx_modules_conf,bad_topic}}, tnx_id: 81
2022-08-15T17:40:51.757855+08:00 [warning] entrypoint: <<"emqx:update_config/3">>, kind: initiate, line: 492, mfa: emqx_cluster_rpc:log_and_alarm/3, msg: cluster_rpc_apply_result, result: {error,{post_config_update,emqx_modules_conf,bad_topic}}, tnx_id: 81
2022-08-15T17:41:48.410340+08:00 [warning] entrypoint: <<"emqx:update_config/3">>, kind: initiate, line: 492, mfa: emqx_cluster_rpc:log_and_alarm/3, msg: cluster_rpc_apply_result, result: {error,{post_config_update,emqx_modules_conf,bad_topic}}, tnx_id: 83
2022-08-15T17:42:41.187256+08:00 [warning] line: 95, mfa: emqx_rule_runtime:apply_rule/3, msg: FOREACH_clause_exception, reason: {error,function_clause,[{emqx_rule_runtime,eval,['*',#{clientid => <<"c_emqx">>,dup => false,event => <<"$bridges/mqtt:phone">>,event_type => message_publish,id => <<"0005E64473F58103F4420600688C0000">>,message_received_at => 1660556561187,metadata => #{rule_id => <<"sql_tester:767b062c908fcd23">>},node => 'emqx@127.0.0.1',payload => <<"{\"msg\": \"hello\"}">>,pub_props => #{'Message-Expiry-Interval' => 30,'Payload-Format-Indicator' => 0,'User-Property' => #{foo => <<"bar">>},'User-Property-Pairs' => [#{key => <<"foo">>},#{value => <<"bar">>}]},qos => 1,retain => false,server => <<"192.168.0.10:1883">>,timestamp => 1660556561187,topic => <<"t/a">>,username => <<"u_emqx">>}],[{file,"emqx_rule_runtime.erl"},{line,358}]},{emqx_rule_runtime,select_and_collect,3,[{file,"emqx_rule_runtime.erl"},{line,227}]},{emqx_rule_runtime,'-do_apply_rule/3-fun-0-',2,[{file,"emqx_rule_runtime.erl"},{line,131}]},{emqx_rule_runtime,do_apply_rule,3,[{file,"emqx_rule_runtime.erl"},{line,130}]},{emqx_rule_runtime,apply_rule,3,[{file,"emqx_rule_runtime.erl"},{line,70}]},{emqx_rule_sqltester,test_rule,4,[{file,"emqx_rule_sqltester.erl"},{line,64}]},{emqx_rule_engine_api,'/rule_test',2,[{file,"emqx_rule_engine_api.erl"},{line,318}]},{minirest_handler,apply_callback,3,[{file,"minirest_handler.erl"},{line,111}]},{minirest_handler,handle,2,[{file,"minirest_handler.erl"},{line,44}]},{minirest_handler,init,2,[{file,"minirest_handler.erl"},{line,27}]},{cowboy_handler,execute,2,[{file,"cowboy_handler.erl"},{line,41}]},{cowboy_stream_h,execute,3,[{file,"cowboy_stream_h.erl"},{line,318}]},{cowboy_stream_h,request_process,3,[{file,"cowboy_stream_h.erl"},{line,302}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, rule_id: <<"sql_tester:767b062c908fcd23">>
2022-08-15T18:02:03.148946+08:00 [warning] line: 405, message: <<"resource down: bridge:mqtt:sendMessage">>, mfa: emqx_alarm:do_actions/3, msg: alarm_is_activated, name: <<"bridge:mqtt:sendMessage">>
2022-08-15T18:02:03.149634+08:00 [error] id: <<"bridge:mqtt:sendMessage">>, line: 491, mfa: emqx_resource_manager:handle_connected_health_check/1, msg: health_check_failed, status: connecting
2022-08-15T18:02:38.837033+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/#
2022-08-15T18:03:43.174609+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/#
2022-08-15T18:06:38.420581+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/3
2022-08-15T18:06:39.126969+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/3
2022-08-15T18:06:39.862967+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/3
2022-08-15T18:09:17.050974+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/3
2022-08-15T18:12:14.606259+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:12:15.359095+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:12:34.308223+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:12:35.191869+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:12:42.989792+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:12:44.124138+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:12:44.886460+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:12:45.589781+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:12:46.318450+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:13:37.563176+08:00 [error] clientid: push_1, line: 95, mfa: emqx_rule_actions:republish/3, msg: recursive_republish_detected, peername: 192.168.1.117:59749, topic: phone/1
2022-08-15T18:17:13.259135+08:00 [warning] line: 411, mfa: emqx_alarm:do_actions/3, msg: alarm_is_deactivated, name: <<"bridge:mqtt:sendMessage">>
2022-08-15T18:27:48.733058+08:00 [warning] line: 405, message: <<"connection congested: #{buffer => 4096,clientid => <<\"sub_1\">>,conn_state => connected,connected_at => 1660557717651,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 29720,message_queue_len => 0,peername => <<\"192.168.1.117:59756\">>,pid => <<\"<0.28207.6>\">>,proto_name => <<\"MQTT\">>,proto_ver => 5,recbuf => 131072,recv_cnt => 29,recv_oct => 453,reductions => 9858333,send_cnt => 1"...>>, mfa: emqx_alarm:do_actions/3, msg: alarm_is_activated, name: <<"conn_congestion/sub_1/admin">>
2022-08-15T18:27:51.840594+08:00 [warning] line: 178, mfa: emqx_sys_mon:handle_info/2, msg: busy_port, portinfo: [{port,#Port<0.15633>},{name,"tcp_inet"},{links,[<0.28207.6>]},{id,125064},{connected,<0.28207.6>},{input,0},{output,6905090},{os_pid,undefined}], procinfo: [{pid,<0.28207.6>},{memory,42776},{total_heap_size,5172},{heap_size,2586},{stack_size,34},{min_heap_size,233},{proc_lib_initial_call,{emqx_connection,init,['Argument__1','Argument__2','Argument__3','Argument__4']}},{initial_call,{proc_lib,init_p,5}},{current_stacktrace,[{erlang,port_command,3,[]},{esockd_transport,async_send,3,[{file,"esockd_transport.erl"},{line,153}]},{emqx_connection,send,2,[{file,"emqx_connection.erl"},{line,868}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,466}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,472}]},{emqx_connection,handle_recv,3,[{file,"emqx_connection.erl"},{line,428}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]},{registered_name,[]},{status,suspended},{message_queue_len,3},{group_leader,<0.1894.0>},{priority,normal},{trap_exit,false},{reductions,26130177},{last_calls,false},{catchlevel,4},{trace,0},{suspending,[]},{sequential_trace_token,[]},{error_handler,error_handler}]
2022-08-15T18:27:57.700869+08:00 [error] Process:            <0.28207.6> on node 'emqx@127.0.0.1', Context:            maximum heap size reached, Max Heap Size:      6291456, Total Heap Size:    6924764, Kill:               true, Error Logger:       true, Message Queue Len:  8, GC Info:            [{old_heap_block_size,1439468},{heap_block_size,5472277},{mbuf_size,13019},{recent_size,1540505},{stack_size,23},{old_heap_size,663141},{heap_size,2134532},{bin_vheap_size,596052},{bin_vheap_block_size,832883},{bin_old_vheap_size,254952},{bin_old_vheap_block_size,514761}]
2022-08-15T18:40:30.709872+08:00 [warning] line: 405, message: <<"connection congested: #{buffer => 4096,clientid => <<\"mqttx_c7850542\">>,conn_state => connected,connected_at => 1660559812851,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 29720,message_queue_len => 0,peername => <<\"192.168.1.117:61240\">>,pid => <<\"<0.31167.6>\">>,proto_name => <<\"MQTT\">>,proto_ver => 5,recbuf => 131072,recv_cnt => 5,recv_oct => 66,reductions => 303192,send_cn"...>>, mfa: emqx_alarm:do_actions/3, msg: alarm_is_activated, name: <<"conn_congestion/mqttx_c7850542/admin">>
2022-08-15T18:40:35.475336+08:00 [warning] line: 178, mfa: emqx_sys_mon:handle_info/2, msg: busy_port, portinfo: [{port,#Port<0.15917>},{name,"tcp_inet"},{links,[<0.31167.6>]},{id,127336},{connected,<0.31167.6>},{input,0},{output,6087604},{os_pid,undefined}], procinfo: [{pid,<0.31167.6>},{memory,26792},{total_heap_size,3196},{heap_size,610},{stack_size,34},{min_heap_size,233},{proc_lib_initial_call,{emqx_connection,init,['Argument__1','Argument__2','Argument__3','Argument__4']}},{initial_call,{proc_lib,init_p,5}},{current_stacktrace,[{erlang,port_command,3,[]},{esockd_transport,async_send,3,[{file,"esockd_transport.erl"},{line,153}]},{emqx_connection,send,2,[{file,"emqx_connection.erl"},{line,868}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,466}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,472}]},{emqx_connection,handle_recv,3,[{file,"emqx_connection.erl"},{line,428}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]},{registered_name,[]},{status,suspended},{message_queue_len,1},{group_leader,<0.1894.0>},{priority,normal},{trap_exit,false},{reductions,22531962},{last_calls,false},{catchlevel,4},{trace,0},{suspending,[]},{sequential_trace_token,[]},{error_handler,error_handler}]
2022-08-15T18:40:39.718951+08:00 [error] Process:            <0.31167.6> on node 'emqx@127.0.0.1', Context:            maximum heap size reached, Max Heap Size:      6291456, Total Heap Size:    9067521, Kill:               true, Error Logger:       true, Message Queue Len:  5, GC Info:            [{old_heap_block_size,3581853},{heap_block_size,5472277},{mbuf_size,13583},{recent_size,1619458},{stack_size,188},{old_heap_size,0},{heap_size,2487203},{bin_vheap_size,624492},{bin_vheap_block_size,1347604},{bin_old_vheap_size,0},{bin_old_vheap_block_size,1347604}]
2022-08-15T19:40:12.352850+08:00 [warning] line: 405, message: <<"connection congested: #{buffer => 4096,clientid => <<\"mqttx_5c9434cd\">>,conn_state => connected,connected_at => 1660562643923,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 34608,message_queue_len => 0,peername => <<\"192.168.1.117:51344\">>,pid => <<\"<0.782.7>\">>,proto_name => <<\"MQTT\">>,proto_ver => 5,recbuf => 131072,recv_cnt => 20,recv_oct => 283,reductions => 6398503,send_c"...>>, mfa: emqx_alarm:do_actions/3, msg: alarm_is_activated, name: <<"conn_congestion/mqttx_5c9434cd/admin">>
2022-08-15T19:40:16.350966+08:00 [warning] line: 178, mfa: emqx_sys_mon:handle_info/2, msg: busy_port, portinfo: [{port,#Port<0.15971>},{name,"tcp_inet"},{links,[<0.782.7>]},{id,127768},{connected,<0.782.7>},{input,0},{output,5960134},{os_pid,undefined}], procinfo: [{pid,<0.782.7>},{memory,34696},{total_heap_size,4184},{heap_size,1598},{stack_size,34},{min_heap_size,233},{proc_lib_initial_call,{emqx_connection,init,['Argument__1','Argument__2','Argument__3','Argument__4']}},{initial_call,{proc_lib,init_p,5}},{current_stacktrace,[{erlang,port_command,3,[]},{esockd_transport,async_send,3,[{file,"esockd_transport.erl"},{line,153}]},{emqx_connection,send,2,[{file,"emqx_connection.erl"},{line,868}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,466}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,472}]},{emqx_connection,handle_recv,3,[{file,"emqx_connection.erl"},{line,428}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]},{registered_name,[]},{status,suspended},{message_queue_len,1},{group_leader,<0.1894.0>},{priority,normal},{trap_exit,false},{reductions,22034787},{last_calls,false},{catchlevel,4},{trace,0},{suspending,[]},{sequential_trace_token,[]},{error_handler,error_handler}]
2022-08-15T19:41:33.948003+08:00 [warning] line: 411, mfa: emqx_alarm:do_actions/3, msg: alarm_is_deactivated, name: <<"conn_congestion/mqttx_5c9434cd/admin">>
2022-08-15T19:56:27.376732+08:00 [warning] line: 405, message: <<"connection congested: #{buffer => 4096,clientid => <<\"mqttx_c2e329b3\">>,conn_state => connected,connected_at => 1660564558073,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 29744,message_queue_len => 0,peername => <<\"192.168.1.117:52489\">>,pid => <<\"<0.3619.7>\">>,proto_name => <<\"MQTT\">>,proto_ver => 5,recbuf => 131072,recv_cnt => 2,recv_oct => 60,reductions => 1708955,send_cn"...>>, mfa: emqx_alarm:do_actions/3, msg: alarm_is_activated, name: <<"conn_congestion/mqttx_c2e329b3/admin">>
2022-08-15T19:56:32.783140+08:00 [warning] line: 178, mfa: emqx_sys_mon:handle_info/2, msg: busy_port, portinfo: [{port,#Port<0.16132>},{name,"tcp_inet"},{links,[<0.3619.7>]},{id,129056},{connected,<0.3619.7>},{input,0},{output,6080997},{os_pid,undefined}], procinfo: [{pid,<0.3619.7>},{memory,34696},{total_heap_size,4184},{heap_size,2586},{stack_size,34},{min_heap_size,233},{proc_lib_initial_call,{emqx_connection,init,['Argument__1','Argument__2','Argument__3','Argument__4']}},{initial_call,{proc_lib,init_p,5}},{current_stacktrace,[{erlang,port_command,3,[]},{esockd_transport,async_send,3,[{file,"esockd_transport.erl"},{line,153}]},{emqx_connection,send,2,[{file,"emqx_connection.erl"},{line,868}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,466}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,472}]},{emqx_connection,handle_recv,3,[{file,"emqx_connection.erl"},{line,428}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]},{registered_name,[]},{status,suspended},{message_queue_len,1},{group_leader,<0.1894.0>},{priority,normal},{trap_exit,false},{reductions,21818220},{last_calls,false},{catchlevel,4},{trace,0},{suspending,[]},{sequential_trace_token,[]},{error_handler,error_handler}]
2022-08-15T19:56:40.309053+08:00 [error] Process:            <0.3619.7> on node 'emqx@127.0.0.1', Context:            maximum heap size reached, Max Heap Size:      6291456, Total Heap Size:    10887613, Kill:               true, Error Logger:       true, Message Queue Len:  6, GC Info:            [{old_heap_block_size,4298223},{heap_block_size,6566731},{mbuf_size,22684},{recent_size,2450554},{stack_size,18},{old_heap_size,0},{heap_size,2984853},{bin_vheap_size,944725},{bin_vheap_block_size,1347604},{bin_old_vheap_size,0},{bin_old_vheap_block_size,1347604}]

客户端截图:


数据源截图:

规则概述截图:

规则处理过程和动作截图:

因为你这里填的这个phone/#

改成phone/el之后可以正常接收了,这是原因造成的啊?

mqtt 桥接中使用的phone/#主题,会匹配所有phone/开头的主题。你发送一条phone/el后,emqx会通过桥接获取到该主题并重新发布成phone/1,此时客户端收到phone/1消息。正常情况下,是没有问题的,但是关键这里是你用的mqtt桥接的broker是emqx自身,在后续重发布phone/1时,相当于emqx再次收到phone/1,那么会再次被桥接的主题phone/#匹配,并传入规则,再次进行重发布-----那么就是一直死循环下去了。 pub-data bridge-rule-republish-databridge-rule-republish…在此无限循环过程中,订阅的客户端就会在每次republish中,收到消息

1 个赞