emqx集群状态不一致。

环境

  • EMQX 版本:5.1.2
  • 操作系统版本:amazon2

重现此问题的步骤

  1. 启动etcd,配置ssl
  2. 启动3 core + 3replica节点

配置文件:

## NOTE:
## Configs in this file might be overridden by:
## 1. Environment variables which start with 'EMQX_' prefix
## 2. File $EMQX_NODE__DATA_DIR/configs/cluster-override.conf
## 3. File $EMQX_NODE__DATA_DIR/configs/local-override.conf
##
## The *-override.conf files are overwritten at runtime when changes
## are made from EMQX dashboard UI, management HTTP API, or CLI.
## All configuration details can be found in emqx.conf.example

# 节点配置
node = {
    name = "emqx-b055@10.51.3.210"
    cookie = "iGxSH4NZasa7tOjHmoCmhs2rSeRAFGOF"
    data_dir = "data"
    db_role = core
    db_backend = rlog
    process_limit = 2048000
    max_ports = 1024000
    dist_buffer_size = 8192
    max_ets_tables = 524288
    crash_dump_file = "log/crash.dump"
    dist_net_ticktime = 5m
    cluster_call = {
      retry_interval = 1m
      max_history = 100
      cleanup_interval = 5m
  }
}

# rpc配置
rpc = {
    mode = async
    async_batch_size = 256
    tcp_server_port = 5369
    tcp_client_num = 96
    port_discovery = manual
    connect_timeout = 3s
    send_timeout = 3s
    authentication_timeout = 3s
    call_receive_timeout = 7s
    socket_keepalive_idle = 15m
    socket_keepalive_interval = 75s
    socket_keepalive_count = 9
    socket_sndbuf = 1MB
    socket_recbuf = 1MB
    socket_buffer = 1MB
}

# 全局mqtt消息配置
mqtt = {
    max_clientid_len = 1024
    max_topic_levels = 7
    max_qos_allowed = 2
    max_topic_alias = 0
	strict_mode = false
    retain_available = true
    wildcard_subscription = false
    shared_subscription = false
    ignore_loop_deliver = false
}

# 集群配置
cluster = {
    name = "emqx_v5_cluster"
    autoheal = true
    autoclean = 5m
    proto_dist = inet_tcp
    discovery_strategy = "etcd"
    etcd = {
      server = "https://etcd.xxxxx:2379"
      prefix = "emqx-cluster"
      node_ttl = 1m
      ssl = {
        keyfile = "/data/etcdssl/etcd-key.pem"
        cacertfile = "/data/etcdssl/ca.pem"
        certfile = "/data/etcdssl/etcd.pem"
        enable = true
      }
    }
}

# 监听配置
listeners.tcp.default = {
    bind = "0.0.0.0:1883"
    max_connections = 1024000
    proxy_protocol = true
    proxy_protocol_timeout = 3s
    enable_authn = true
    acceptors = 64
    limiter = {
      connection = {
        rate = "400/s"
        initial = 400
        capacity = 1185
        burst = "20"
      }
      client.message_in = {
        rate = "100/s"
        initial = 100
        capacity = 500
      }
    }
    access_rules = ["allow all"]
    tcp_options = {
      active_n = 100
      backlog = 1024
      send_timeout = 7s
      send_timeout_close = true
      nodelay = true
      reuseaddr = true
    }
}

listeners.tcp.internal = {
  bind = "0.0.0.0:38811"
  acceptors = 64
  max_connections = 102400
  proxy_protocol = false
  enable_authn = true
  limiter.connection = {
    rate = "237/s"
    burst = "20"
  }
  tcp_options = {
    active_n = 300
    backlog = 1024
    send_timeout = 3s
    send_timeout_close = true
    nodelay = true
    reuseaddr = true
  }
}


listeners.ssl.default = {
  bind = "0.0.0.0:8883"
  max_connections = 512000
  ssl_options {
    keyfile = "etc/certs/key.pem"
    certfile = "etc/certs/cert.pem"
    cacertfile = "etc/certs/cacert.pem"
  }
}

listeners.ws.default = {
  bind = "0.0.0.0:8083"
  max_connections = 1024
  acceptors = 8
  proxy_protocol = false
  websocket = {
    mqtt_path = "/mqtt"
    proxy_address_header = x-forwarded-for
    proxy_port_header = x-forwarded-port
  }
  limiter.connection = {
    rate = "100/s"
    burst = "20"
    initial = 100
    capacity = 500
  }
  access_rules = ["allow all"]
}

listeners.wss.default = {
  bind = "0.0.0.0:8084"
  max_connections = 512000
  websocket.mqtt_path = "/mqtt"
  ssl_options = {
    keyfile = "etc/certs/key.pem"
    certfile = "etc/certs/cert.pem"
    cacertfile = "etc/certs/cacert.pem"
  }
}

# 延迟消息
delayed = {
    enable = true
    max_delayed_messages = 5
}

# 保留消息
retainer = {
    enable = true
    msg_expiry_interval = 3h
    msg_clear_interval = 6h
    backend = {
      storage_type = ram
      max_retained_messages = 5
    }
}

# 日志配置
log = {
    file_handlers.default = {
      enable = true
      level = error
      file = "log/emqx.log"
      sync_mode_qlen = 256
      chars_limit = 16384
      formatter = text
      max_size = 64MB
      rotation.count = 10
      burst_limit = {
        enable = true
        max_count = 20480
        window_time = 3s
      }
      overload_kill = {
        enable = true
        mem_size = 50MB
        qlen = 30000
      }
  }
}

# listeners.quic.default {
#  enabled = true
#  bind = "0.0.0.0:14567"
#  max_connections = 1024000
#  ssl_options {
#   verify = verify_none
#   keyfile = "etc/certs/key.pem"
#   certfile = "etc/certs/cert.pem"
#   cacertfile = "etc/certs/cacert.pem"
#  }
# }

# 控制台配置
dashboard = {
    default_username = "admin"
    sample_interval = 10s
    token_expired_time = 60m
    cors = false
    i18n_lang = en
    listeners.http = {
        enable = true
        bind = 38080
        num_acceptors = 8
        # cloud程序38088需要使用这个端口
        max_connections = 1024
        backlog = 1024
        send_timeout = 10s
    }
}

# 授权配置
authorization = {
  deny_action = ignore
  no_match = deny
  sources = [
    {
      type = file
      enable = true
      path = "etc/acl.conf"
    },
    {
      type = http
      method = post
      enable = true
      url = "http://xxxxxx/auth/EmqxAcl/acl"
      request_timeout = 10s
      connect_timeout = 10s
      enable_pipelining = 237
      pool_size = 8
      body = {
        action = "${action}",
        clientid = "${clientid}",
        from = "emqx5",
        ipaddr = "${peerhost}",
        topic = "${topic}",
        username = "${username}"
      }
      headers = {
         Content-Type = "application/json"
         X-Request-Source = "EMQX"
         accept = "application/json"
         cache-control = "no-cache"
         connection = "keep-alive"
         keep-alive = "timeout=30, max=1000"
      }
    }
  ]
  cache = {
    enable = true
    max_size = 64
    ttl = 15m
  }
}


# 认证配置
authentication = [
  {
    backend = http
    method = post
    mechanism = password_based
    enable = true
    request_timeout = 5s
    connect_timeout = 10s
    enable_pipelining = 100
    pool_size = 8
    url = "http://xxxx/auth/emqxAuth/auth"
    body = {
      clientid =  "${clientid}"
      from =  "emqx5"
      ipaddr = "${peerhost}"
      password = "${password}"
      username = "${username}"
    }
    headers = {
      Content-Type = "application/json"
      X-Request-Source = "EMQX"
      accept = "application/json"
      cache-control = "no-cache"
      connection = "keep-alive"
      keep-alive = "timeout=30, max=1000"
    }
  }
]

# 系统监控
sysmon = {
  os = {
    cpu_check_interval = 60s
    cpu_high_watermark = 95%
    cpu_low_watermark = 90%
    mem_check_interval = 60s
    sysmem_high_watermark = 80%
    procmem_high_watermark = 5%
  }
  vm = {
    long_gc = disabled
    long_schedule = 240ms
    large_heap = 32MB
    busy_port = false
    busy_dist_port = true
    process_high_watermark = 80%
    process_low_watermark = 70%
  }
}

# prometheus 配置
prometheus = {
    enable = false
    push_gateway_server = "http://127.0.0.1:9091"
    interval = 15s
}

# Force garbage collection in MQTT connection process after they process certain number of messages or bytes of data.
force_gc = {
    enable = true
    bytes = 16MB
    count = 16000
}

# When the process message queue length, or the memory bytes reaches a certain value, the process is forced to close.
#
# Note: "message queue" here refers to the "message mailbox" of the Erlang process, not the mqueue of QoS 1 and QoS 2.
force_shutdown = {
    enable = true
    max_message_queue_len = 1000
    max_heap_size = 32MB
}

# 事件主题
sys_topics = {
    sys_event_messages = {
      client_connected = false
      client_disconnected = false
    }
}

预期行为

成功加入集群

实际行为

集群 cluster status 状态不一致

etcd version 3.5.6,其中两个key没有value

emqx-cluster/ekkacl/lock//24ad88af294e78ea

emqx-cluster/ekkacl/lock//24ad88af294e7cca

emqx-cluster/ekkacl/nodes/emqx-122@10.51.4.153
emqx-122@10.51.4.153
emqx-cluster/ekkacl/nodes/emqx-b055@10.51.3.210
emqx-b055@10.51.3.210

emqx节点错误日志

2023-07-24T06:53:02.189461+00:00 [error] Ekka(AutoCluster): Unlock error: lock_lose
2023-07-24T06:53:02.189683+00:00 [error] Generic server ekka_autocluster terminating. Reason: {{timeout,{gen_server,call,[ekka_cluster_etcd,lock,5000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,247}]},{ekka_autocluster,'-discover_and_join/0-fun-0-',2,[{file,"ekka_autocluster.erl"},{line,167}]},{ekka_autocluster,handle_info,2,[{file,"ekka_autocluster.erl"},{line,116}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}. Last message: loop. State: {s,emqx}.
2023-07-24T06:53:02.190599+00:00 [error] crasher: initial call: ekka_autocluster:init/1, pid: <0.3236.0>, registered_name: ekka_autocluster, exit: {{timeout,{gen_server,call,[ekka_cluster_etcd,lock,5000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,247}]},{ekka_autocluster,'-discover_and_join/0-fun-0-',2,[{file,"ekka_autocluster.erl"},{line,167}]},{ekka_autocluster,handle_info,2,[{file,"ekka_autocluster.erl"},{line,116}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [emqx_machine_sup,<0.1959.0>], message_queue_len: 0, messages: [], links: [], dictionary: [], trap_exit: false, status: running, heap_size: 6772, stack_size: 28, reductions: 28608; neighbours: []
2023-07-24T07:01:35.708771+00:00 [error] Generic server memsup terminating. Reason: {port_died,normal}. Last message: {'EXIT',<0.1896.0>,{port_died,normal}}. State: [{data,[{"Timeout",60000}]},{items,{"Memory Usage",[{"Allocated",593592320},{"Total",8114462720}]}},{items,{"Worst Memory User",[{"Pid",<0.2885.0>},{"Memory",13090016}]}}].
2023-07-24T07:01:35.709351+00:00 [error] crasher: initial call: memsup:init/1, pid: <0.1895.0>, registered_name: memsup, exit: {{port_died,normal},[{gen_server,handle_common_reply,8,[{file,"gen_server.erl"},{line,811}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [os_mon_sup,<0.1893.0>], message_queue_len: 0, messages: [], links: [<0.1894.0>], dictionary: [], trap_exit: true, status: running, heap_size: 2586, stack_size: 28, reductions: 884369; neighbours: []
2023-07-24T07:01:35.709232+00:00 [error] jq port program has died unexpectedly for reason normal (state = #{id => 0, port =>, #Port<0.34>, processed_json_calls =>, 0, restart_period =>, 1000000}) , Trying to restart...
2023-07-24T07:01:35.709349+00:00 [error] jq port program has died unexpectedly for reason normal (state = #{id => 1, port =>, #Port<0.35>, processed_json_calls =>, 0, restart_period =>, 1000000}) , Trying to restart...
2023-07-24T07:01:35.710056+00:00 [error] Supervisor: {local,os_mon_sup}. Context: shutdown_error. Reason: {port_died,normal}. Offender: id=memsup,pid=<0.1895.0>.
2023-07-24T07:01:49.523025+00:00 [error] Ekka(AutoCluster): Unlock error: lock_lose
2023-07-24T07:01:49.523297+00:00 [error] Generic server ekka_autocluster terminating. Reason: {{timeout,{gen_server,call,[ekka_cluster_etcd,lock,5000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,247}]},{ekka_autocluster,'-discover_and_join/0-fun-0-',2,[{file,"ekka_autocluster.erl"},{line,167}]},{ekka_autocluster,handle_info,2,[{file,"ekka_autocluster.erl"},{line,116}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}. Last message: loop. State: {s,emqx}.
2023-07-24T07:01:49.524192+00:00 [error] crasher: initial call: ekka_autocluster:init/1, pid: <0.3197.0>, registered_name: ekka_autocluster, exit: {{timeout,{gen_server,call,[ekka_cluster_etcd,lock,5000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,247}]},{ekka_autocluster,'-discover_and_join/0-fun-0-',2,[{file,"ekka_autocluster.erl"},{line,167}]},{ekka_autocluster,handle_info,2,[{file,"ekka_autocluster.erl"},{line,116}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [emqx_machine_sup,<0.1959.0>], message_queue_len: 0, messages: [], links: [], dictionary: [], trap_exit: false, status: running, heap_size: 6772, stack_size: 28, reductions: 28608; neighbours: []

相关截图:
image
image
image

etcd错误日志

Jul 24 07:01:36 xxxx etcd[7003]: {"level":"warn","ts":"2023-07-24T07:01:36.000Z","caller":"v3rpc/interceptor.go:197","msg":"request stats","start time":"2023-07-24T06:52:53.869Z","time spent":"8m42.130801172s","remote":"10.51.5.206:35624","response type":"/v3lockpb.Lock/Lock","request count":-1,"request size":-1,"response count":-1,"response size":-1,"request content":""}
Jul 24 07:01:36 xxxx etcd[7003]: WARNING: 2023/07/24 07:01:36 [core] grpc: Server.processUnaryRPC failed to write status: connection error: desc = "transport is closing"

@JimMoen 帮忙确认下是否是配置的问题

hi, etcd 自动集群问题已经有 github issue 在跟踪,请持续关注