环境信息
- EMQX 版本:5.0
- 操作系统及版本:windows10
- 其他
问题描述
在同一局域网下,一台PC-A在docker里,以静态配置的方式组建了包含五个节点的集群,另一台PC-B同样组建了包含两个节点的集群。尝试在PC-A上使用规则+数据桥接mqtt sink去往PC-B上的集群转发数据,具体流程为,从PC-A的testtopic/local主题里筛选出payload,通过规则->动作->mqtt sink 转发至testtopic/remote主题上,但当我配置好,在PC-A某个节点dashboard里的websocket客户端发布主题为testtopic/local的消息时,docker控制台会报好几个错误,并且dashboard里的建立的数据桥接的资源状态就会显示已断开。
配置文件及日志
配置文件(PC-A的配置文件和下面的配置基本一致,PC-B和PC-A相比没有配置bridges):
## NOTE:
## Configs in this file might be overridden by:
## 1. Environment variables which start with 'EMQX_' prefix
## 2. File $EMQX_NODE__DATA_DIR/configs/cluster-override.conf
## 3. File $EMQX_NODE__DATA_DIR/configs/local-override.conf
##
## The *-override.conf files are overwritten at runtime when changes
## are made from EMQX dashboard UI, management HTTP API, or CLI.
## All configuration details can be found in emqx.conf.example
node {
name = "emqx@127.0.0.1"
cookie = emqxsecretcookie
data_dir = "data"
etc_dir = "etc"
}
log {
file_handlers.default {
level = warning
file = "log/emqx.log"
}
}
cluster {
name = emqxcl
discovery_strategy = static
static {
seeds = ["emqx@172.18.0.2", "emqx@172.18.0.3", "emqx@172.18.0.4", "emqx@172.18.0.5", "emqx@172.18.0.6"]
}
}
node.name = !emqx@172.18.0.6
listeners.tcp.default {
bind = "0.0.0.0:1883"
max_connections = 1024000
}
listeners.ssl.default {
bind = "0.0.0.0:8883"
max_connections = 512000
ssl_options {
keyfile = "etc/certs/key.pem"
certfile = "etc/certs/cert.pem"
cacertfile = "etc/certs/cacert.pem"
}
}
listeners.ws.default {
bind = "0.0.0.0:8083"
max_connections = 1024000
websocket.mqtt_path = "/mqtt"
}
listeners.wss.default {
bind = "0.0.0.0:8084"
max_connections = 512000
websocket.mqtt_path = "/mqtt"
ssl_options {
keyfile = "etc/certs/key.pem"
certfile = "etc/certs/cert.pem"
cacertfile = "etc/certs/cacert.pem"
}
}
# listeners.quic.default {
# enabled = true
# bind = "0.0.0.0:14567"
# max_connections = 1024000
# keyfile = "etc/certs/key.pem"
# certfile = "etc/certs/cert.pem"
#}
dashboard {
listeners.http {
bind: 18083
}
default_username: "admin"
default_password: "public"
}
authorization {
deny_action: ignore
no_match: allow
sources: [
{
type: file
path: "etc/acl.conf"
}
]
}
bridges {
mqtt {
mqtt_bridge_egress {
connector {
bridge_mode = false
clean_start = true
keepalive = "60s"
max_inflight = 32
mode = "cluster_shareload"
username = "emqx"
password = "emqx"
proto_ver = "v4"
reconnect_interval = "15s"
retry_interval = "15s"
server = "192.168.0.242:34883"
ssl { enable = false }
}
direction = "egress"
enable = true
local_topic = "testtopic/local"
remote_qos = 0
remote_topic = "testtopic/remote"
payload = "${payload}"
retain = false
}
}
}
include emqx_enterprise.conf
日志:
2022-08-08T06:47:50.790088+00:00 [warning] line: 411, mfa: emqx_alarm:do_actions/3, msg: alarm_is_deactivated, name: <<"bridge:mqtt:mqtt_bridge_egress">>
[rule action] rule_ycvl
Action Data: #{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}
Envs: #{clientid => <<"emqx_NTAzND">>,event => 'message.publish',
flags => #{dup => false,retain => false},
headers =>
#{peerhost => {172,18,0,1},
properties => #{},proto_ver => 4,protocol => mqtt,
username => <<>>},
id => <<"0005E5B5390B0251D7C000000C390004">>,
metadata => #{rule_id => <<"rule_ycvl">>},
node => 'emqx@172.18.0.2',
payload => <<"{ \"msg\": \"hello\" }">>,
peerhost => <<"172.18.0.1">>,pub_props => #{},
publish_received_at => 1659941392417,qos => 0,
timestamp => 1659941392417,topic => <<"testtopic/local">>,
username => <<>>}
2022-08-08T06:49:52.418025+00:00 [error] State machine 'bridge:mqtt:mqtt_bridge_egress:22' terminating. Reason: function_clause. Stack: [{emqx_connector_mqtt_msg,estimate_size,[#{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}],[{file,"emqx_connector_mqtt_msg.erl"},{line,146}]},{replayq,transform,6,[{file,"replayq.erl"},{line,298}]},{replayq,append,2,[{file,"replayq.erl"},{line,189}]},{emqx_connector_mqtt_worker,common,4,[{file,"emqx_connector_mqtt_worker.erl"},{line,369}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]. Last event: {cast,{send_to_remote,#{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}}}. State: {connected,#{connect_opts => #{auto_reconnect => true,bridge_mode => false,clean_start => true,clientid => <<"bridge:mqtt:mqtt_bridge_egress:22:emqx@172.18.0.2">>,connect_timeout => 30,forwards => #{enable => true,local_topic => <<"testtopic/local">>,payload => [{var,{var,<<"payload">>}}],remote_qos => 2,remote_topic => [{str,<<"testtopic/remote">>}],retain => false},if_record_metrics => true,keepalive => 60,max_inflight => 32,name => 'bridge:mqtt:mqtt_bridge_egress:22',password => <<>>,proto_ver => v4,reconnect_interval => 15000,replayq => #{offload => false,seg_bytes => 104857600},retry_interval => 15000,server => {{192,168,0,242},34883},ssl => false,ssl_opts => [{ciphers,["TLS_AES_256_GCM_SHA384","TLS_AES_128_GCM_SHA256","TLS_CHACHA20_POLY1305_SHA256","TLS_AES_128_CCM_SHA256","TLS_AES_128_CCM_8_SHA256","ECDHE-ECDSA-AES256-GCM-SHA384","ECDHE-RSA-AES256-GCM-SHA384","ECDHE-ECDSA-AES256-SHA384","ECDHE-RSA-AES256-SHA384","ECDH-ECDSA-AES256-GCM-SHA384","ECDH-RSA-AES256-GCM-SHA384","ECDH-ECDSA-AES256-SHA384","ECDH-RSA-AES256-SHA384","DHE-DSS-AES256-GCM-SHA384","DHE-DSS-AES256-SHA256","AES256-GCM-SHA384","AES256-SHA256","ECDHE-ECDSA-AES128-GCM-SHA256","ECDHE-RSA-AES128-GCM-SHA256","ECDHE-ECDSA-AES128-SHA256","ECDHE-RSA-AES128-SHA256","ECDH-ECDSA-AES128-GCM-SHA256","ECDH-RSA-AES128-GCM-SHA256","ECDH-ECDSA-AES128-SHA256","ECDH-RSA-AES128-SHA256","DHE-DSS-AES128-GCM-SHA256","DHE-DSS-AES128-SHA256","AES128-GCM-SHA256","AES128-SHA256","ECDHE-ECDSA-AES256-SHA","ECDHE-RSA-AES256-SHA","DHE-DSS-AES256-SHA","ECDH-ECDSA-AES256-SHA","ECDH-RSA-AES256-SHA","ECDHE-ECDSA-AES128-SHA","ECDHE-RSA-AES128-SHA","DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA","ECDH-RSA-AES128-SHA","RSA-PSK-AES256-GCM-SHA384","RSA-PSK-AES256-CBC-SHA384","RSA-PSK-AES128-GCM-SHA256","RSA-PSK-AES128-CBC-SHA256","RSA-PSK-AES256-CBC-SHA","RSA-PSK-AES128-CBC-SHA"]},{depth,10},{reuse_sessions,true},{secure_renegotiate,true},{user_lookup_fun,{fun emqx_tls_psk:lookup/3,undefined}},{verify,verify_none},{versions,['tlsv1.3','tlsv1.2','tlsv1.1',tlsv1]}],subscriptions => undefined,username => <<>>},connection => #{client_pid => <0.3992.0>,subscriptions => undefined},inflight => [],max_inflight => 32,mountpoint => undefined,name => 'bridge:mqtt:mqtt_bridge_egress:22',reconnect_interval => 15000,replayq => #{config => mem_only,in_mem => {[],[]},max_total_bytes => 2000000000,sizer => fun emqx_connector_mqtt_msg:estimate_size/1,stats => #{bytes => 0,count => 0}},start_type => manual}}.
2022-08-08T06:49:52.418593+00:00 [error] crasher: initial call: emqx_connector_mqtt_worker:init/1, pid: <0.3991.0>, registered_name: 'bridge:mqtt:mqtt_bridge_egress:22', error: {function_clause,[{emqx_connector_mqtt_msg,estimate_size,[#{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}],[{file,"emqx_connector_mqtt_msg.erl"},{line,146}]},{replayq,transform,6,[{file,"replayq.erl"},{line,298}]},{replayq,append,2,[{file,"replayq.erl"},{line,189}]},{emqx_connector_mqtt_worker,common,4,[{file,"emqx_connector_mqtt_worker.erl"},{line,369}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [emqx_connector_mqtt,emqx_connector_sup,<0.2456.0>], message_queue_len: 2, messages: [{'$gen_cast',{send_to_remote,#{clientid => <<"emqx_NTAzND">>,event => 'message.publish',flags => #{dup => false,retain => false},id => <<"0005E5B5390B0251D7C000000C390004">>,node => 'emqx@172.18.0.2',payload => <<"{ \"msg\": \"hello\" }">>,peerhost => <<"172.18.0.1">>,pub_props => #{},publish_received_at => 1659941392417,qos => 0,timestamp => 1659941392417,topic => <<"testtopic/local">>,username => <<>>}}},{disconnected,<0.3992.0>,normal}], links: [<0.2458.0>], dictionary: [], trap_exit: true, status: running, heap_size: 28690, stack_size: 29, reductions: 43000; neighbours:
2022-08-08T06:49:52.419015+00:00 [error] Supervisor: {local,emqx_connector_mqtt}. Context: child_terminated. Reason: {function_clause,[{emqx_connector_mqtt_msg,estimate_size,[#{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}],[{file,"emqx_connector_mqtt_msg.erl"},{line,146}]},{replayq,transform,6,[{file,"replayq.erl"},{line,298}]},{replayq,append,2,[{file,"replayq.erl"},{line,189}]},{emqx_connector_mqtt_worker,common,4,[{file,"emqx_connector_mqtt_worker.erl"},{line,369}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}. Offender: id='bridge:mqtt:mqtt_bridge_egress:22',pid=<0.3991.0>.
2022-08-08T06:49:55.206313+00:00 [warning] line: 405, message: <<"resource down: bridge:mqtt:mqtt_bridge_egress">>, mfa: emqx_alarm:do_actions/3, msg: alarm_is_activated, name: <<"bridge:mqtt:mqtt_bridge_egress">>
2022-08-08T06:49:55.206519+00:00 [error] health check for <<"bridge:mqtt:mqtt_bridge_egress">> failed: connecting