数据桥接失败

环境信息

  • EMQX 版本:5.0
  • 操作系统及版本:windows10
  • 其他

问题描述

在同一局域网下,一台PC-A在docker里,以静态配置的方式组建了包含五个节点的集群,另一台PC-B同样组建了包含两个节点的集群。尝试在PC-A上使用规则+数据桥接mqtt sink去往PC-B上的集群转发数据,具体流程为,从PC-A的testtopic/local主题里筛选出payload,通过规则->动作->mqtt sink 转发至testtopic/remote主题上,但当我配置好,在PC-A某个节点dashboard里的websocket客户端发布主题为testtopic/local的消息时,docker控制台会报好几个错误,并且dashboard里的建立的数据桥接的资源状态就会显示已断开。

配置文件及日志

配置文件(PC-A的配置文件和下面的配置基本一致,PC-B和PC-A相比没有配置bridges):

## NOTE:
## Configs in this file might be overridden by:
## 1. Environment variables which start with 'EMQX_' prefix
## 2. File $EMQX_NODE__DATA_DIR/configs/cluster-override.conf
## 3. File $EMQX_NODE__DATA_DIR/configs/local-override.conf
##
## The *-override.conf files are overwritten at runtime when changes
## are made from EMQX dashboard UI, management HTTP API, or CLI.
## All configuration details can be found in emqx.conf.example

node {
  name = "emqx@127.0.0.1"
  cookie = emqxsecretcookie
  data_dir = "data"
  etc_dir = "etc"
}

log {
  file_handlers.default {
    level = warning
    file = "log/emqx.log"
  }
}

cluster {
	name = emqxcl 
    discovery_strategy = static
    static {
        seeds = ["emqx@172.18.0.2", "emqx@172.18.0.3", "emqx@172.18.0.4", "emqx@172.18.0.5", "emqx@172.18.0.6"] 
    }
}

node.name = !emqx@172.18.0.6

listeners.tcp.default {
  bind = "0.0.0.0:1883"
  max_connections = 1024000
}

listeners.ssl.default {
  bind = "0.0.0.0:8883"
  max_connections = 512000
  ssl_options {
    keyfile = "etc/certs/key.pem"
    certfile = "etc/certs/cert.pem"
    cacertfile = "etc/certs/cacert.pem"
  }
}

listeners.ws.default {
  bind = "0.0.0.0:8083"
  max_connections = 1024000
  websocket.mqtt_path = "/mqtt"
}

listeners.wss.default {
  bind = "0.0.0.0:8084"
  max_connections = 512000
  websocket.mqtt_path = "/mqtt"
  ssl_options {
    keyfile = "etc/certs/key.pem"
    certfile = "etc/certs/cert.pem"
    cacertfile = "etc/certs/cacert.pem"
  }
}

# listeners.quic.default {
#  enabled = true
#  bind = "0.0.0.0:14567"
#  max_connections = 1024000
#  keyfile = "etc/certs/key.pem"
#  certfile = "etc/certs/cert.pem"
#}

dashboard {
    listeners.http {
        bind: 18083
    }
    default_username: "admin"
    default_password: "public"
}

authorization {
  deny_action: ignore
  no_match: allow
  sources: [
    {
      type: file
      path: "etc/acl.conf"
    }
  ]
}


bridges {
  mqtt {
    mqtt_bridge_egress {
      connector {
        bridge_mode = false
        clean_start = true
        keepalive = "60s"
        max_inflight = 32
        mode = "cluster_shareload"
        username = "emqx"
        password = "emqx"
        proto_ver = "v4"
        reconnect_interval = "15s"
        retry_interval = "15s"
        server = "192.168.0.242:34883"
        ssl { enable = false }
      }
      direction = "egress"
      enable = true
      local_topic = "testtopic/local"
      remote_qos = 0
      remote_topic = "testtopic/remote"
      payload = "${payload}"
      retain = false
    }
  }
}
include emqx_enterprise.conf

日志:

2022-08-08T06:47:50.790088+00:00 [warning] line: 411, mfa: emqx_alarm:do_actions/3, msg: alarm_is_deactivated, name: <<"bridge:mqtt:mqtt_bridge_egress">>

[rule action] rule_ycvl

Action Data: #{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}

Envs: #{clientid => <<"emqx_NTAzND">>,event => 'message.publish',

                flags => #{dup => false,retain => false},

                headers =>

                    #{peerhost => {172,18,0,1},

                      properties => #{},proto_ver => 4,protocol => mqtt,

                      username => <<>>},

                id => <<"0005E5B5390B0251D7C000000C390004">>,

                metadata => #{rule_id => <<"rule_ycvl">>},

                node => 'emqx@172.18.0.2',

                payload => <<"{ \"msg\": \"hello\" }">>,

                peerhost => <<"172.18.0.1">>,pub_props => #{},

                publish_received_at => 1659941392417,qos => 0,

                timestamp => 1659941392417,topic => <<"testtopic/local">>,

                username => <<>>}

2022-08-08T06:49:52.418025+00:00 [error] State machine 'bridge:mqtt:mqtt_bridge_egress:22' terminating. Reason: function_clause. Stack: [{emqx_connector_mqtt_msg,estimate_size,[#{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}],[{file,"emqx_connector_mqtt_msg.erl"},{line,146}]},{replayq,transform,6,[{file,"replayq.erl"},{line,298}]},{replayq,append,2,[{file,"replayq.erl"},{line,189}]},{emqx_connector_mqtt_worker,common,4,[{file,"emqx_connector_mqtt_worker.erl"},{line,369}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]. Last event: {cast,{send_to_remote,#{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}}}. State: {connected,#{connect_opts => #{auto_reconnect => true,bridge_mode => false,clean_start => true,clientid => <<"bridge:mqtt:mqtt_bridge_egress:22:emqx@172.18.0.2">>,connect_timeout => 30,forwards => #{enable => true,local_topic => <<"testtopic/local">>,payload => [{var,{var,<<"payload">>}}],remote_qos => 2,remote_topic => [{str,<<"testtopic/remote">>}],retain => false},if_record_metrics => true,keepalive => 60,max_inflight => 32,name => 'bridge:mqtt:mqtt_bridge_egress:22',password => <<>>,proto_ver => v4,reconnect_interval => 15000,replayq => #{offload => false,seg_bytes => 104857600},retry_interval => 15000,server => {{192,168,0,242},34883},ssl => false,ssl_opts => [{ciphers,["TLS_AES_256_GCM_SHA384","TLS_AES_128_GCM_SHA256","TLS_CHACHA20_POLY1305_SHA256","TLS_AES_128_CCM_SHA256","TLS_AES_128_CCM_8_SHA256","ECDHE-ECDSA-AES256-GCM-SHA384","ECDHE-RSA-AES256-GCM-SHA384","ECDHE-ECDSA-AES256-SHA384","ECDHE-RSA-AES256-SHA384","ECDH-ECDSA-AES256-GCM-SHA384","ECDH-RSA-AES256-GCM-SHA384","ECDH-ECDSA-AES256-SHA384","ECDH-RSA-AES256-SHA384","DHE-DSS-AES256-GCM-SHA384","DHE-DSS-AES256-SHA256","AES256-GCM-SHA384","AES256-SHA256","ECDHE-ECDSA-AES128-GCM-SHA256","ECDHE-RSA-AES128-GCM-SHA256","ECDHE-ECDSA-AES128-SHA256","ECDHE-RSA-AES128-SHA256","ECDH-ECDSA-AES128-GCM-SHA256","ECDH-RSA-AES128-GCM-SHA256","ECDH-ECDSA-AES128-SHA256","ECDH-RSA-AES128-SHA256","DHE-DSS-AES128-GCM-SHA256","DHE-DSS-AES128-SHA256","AES128-GCM-SHA256","AES128-SHA256","ECDHE-ECDSA-AES256-SHA","ECDHE-RSA-AES256-SHA","DHE-DSS-AES256-SHA","ECDH-ECDSA-AES256-SHA","ECDH-RSA-AES256-SHA","ECDHE-ECDSA-AES128-SHA","ECDHE-RSA-AES128-SHA","DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA","ECDH-RSA-AES128-SHA","RSA-PSK-AES256-GCM-SHA384","RSA-PSK-AES256-CBC-SHA384","RSA-PSK-AES128-GCM-SHA256","RSA-PSK-AES128-CBC-SHA256","RSA-PSK-AES256-CBC-SHA","RSA-PSK-AES128-CBC-SHA"]},{depth,10},{reuse_sessions,true},{secure_renegotiate,true},{user_lookup_fun,{fun emqx_tls_psk:lookup/3,undefined}},{verify,verify_none},{versions,['tlsv1.3','tlsv1.2','tlsv1.1',tlsv1]}],subscriptions => undefined,username => <<>>},connection => #{client_pid => <0.3992.0>,subscriptions => undefined},inflight => [],max_inflight => 32,mountpoint => undefined,name => 'bridge:mqtt:mqtt_bridge_egress:22',reconnect_interval => 15000,replayq => #{config => mem_only,in_mem => {[],[]},max_total_bytes => 2000000000,sizer => fun emqx_connector_mqtt_msg:estimate_size/1,stats => #{bytes => 0,count => 0}},start_type => manual}}.

2022-08-08T06:49:52.418593+00:00 [error] crasher: initial call: emqx_connector_mqtt_worker:init/1, pid: <0.3991.0>, registered_name: 'bridge:mqtt:mqtt_bridge_egress:22', error: {function_clause,[{emqx_connector_mqtt_msg,estimate_size,[#{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}],[{file,"emqx_connector_mqtt_msg.erl"},{line,146}]},{replayq,transform,6,[{file,"replayq.erl"},{line,298}]},{replayq,append,2,[{file,"replayq.erl"},{line,189}]},{emqx_connector_mqtt_worker,common,4,[{file,"emqx_connector_mqtt_worker.erl"},{line,369}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [emqx_connector_mqtt,emqx_connector_sup,<0.2456.0>], message_queue_len: 2, messages: [{'$gen_cast',{send_to_remote,#{clientid => <<"emqx_NTAzND">>,event => 'message.publish',flags => #{dup => false,retain => false},id => <<"0005E5B5390B0251D7C000000C390004">>,node => 'emqx@172.18.0.2',payload => <<"{ \"msg\": \"hello\" }">>,peerhost => <<"172.18.0.1">>,pub_props => #{},publish_received_at => 1659941392417,qos => 0,timestamp => 1659941392417,topic => <<"testtopic/local">>,username => <<>>}}},{disconnected,<0.3992.0>,normal}], links: [<0.2458.0>], dictionary: [], trap_exit: true, status: running, heap_size: 28690, stack_size: 29, reductions: 43000; neighbours:

2022-08-08T06:49:52.419015+00:00 [error] Supervisor: {local,emqx_connector_mqtt}. Context: child_terminated. Reason: {function_clause,[{emqx_connector_mqtt_msg,estimate_size,[#{<<"payload">> => <<"{ \"msg\": \"hello\" }">>}],[{file,"emqx_connector_mqtt_msg.erl"},{line,146}]},{replayq,transform,6,[{file,"replayq.erl"},{line,298}]},{replayq,append,2,[{file,"replayq.erl"},{line,189}]},{emqx_connector_mqtt_worker,common,4,[{file,"emqx_connector_mqtt_worker.erl"},{line,369}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}. Offender: id='bridge:mqtt:mqtt_bridge_egress:22',pid=<0.3991.0>.

2022-08-08T06:49:55.206313+00:00 [warning] line: 405, message: <<"resource down: bridge:mqtt:mqtt_bridge_egress">>, mfa: emqx_alarm:do_actions/3, msg: alarm_is_activated, name: <<"bridge:mqtt:mqtt_bridge_egress">>

2022-08-08T06:49:55.206519+00:00 [error] health check for <<"bridge:mqtt:mqtt_bridge_egress">> failed: connecting

1 个赞

你好,发送配置文件请使用 markdown 的代码块,就不会被识别为链接;

我已经帮你修改了格式,你可以点击 编辑,将配置重新粘贴

这个似乎是一个 Bug, 我们这边会尽快进行排查
https://github.com/emqx/emqx/issues/8494

谢谢您的回答!如果是Bug的话,请您这边修复好以后,通知我一下,谢谢!