环境
- EMQX 版本:5.1.2
- 操作系统版本:amazon2
重现此问题的步骤
- 启动etcd,配置ssl
- 启动3 core + 3replica节点
配置文件:
## NOTE:
## Configs in this file might be overridden by:
## 1. Environment variables which start with 'EMQX_' prefix
## 2. File $EMQX_NODE__DATA_DIR/configs/cluster-override.conf
## 3. File $EMQX_NODE__DATA_DIR/configs/local-override.conf
##
## The *-override.conf files are overwritten at runtime when changes
## are made from EMQX dashboard UI, management HTTP API, or CLI.
## All configuration details can be found in emqx.conf.example
# 节点配置
node = {
name = "emqx-b055@10.51.3.210"
cookie = "iGxSH4NZasa7tOjHmoCmhs2rSeRAFGOF"
data_dir = "data"
db_role = core
db_backend = rlog
process_limit = 2048000
max_ports = 1024000
dist_buffer_size = 8192
max_ets_tables = 524288
crash_dump_file = "log/crash.dump"
dist_net_ticktime = 5m
cluster_call = {
retry_interval = 1m
max_history = 100
cleanup_interval = 5m
}
}
# rpc配置
rpc = {
mode = async
async_batch_size = 256
tcp_server_port = 5369
tcp_client_num = 96
port_discovery = manual
connect_timeout = 3s
send_timeout = 3s
authentication_timeout = 3s
call_receive_timeout = 7s
socket_keepalive_idle = 15m
socket_keepalive_interval = 75s
socket_keepalive_count = 9
socket_sndbuf = 1MB
socket_recbuf = 1MB
socket_buffer = 1MB
}
# 全局mqtt消息配置
mqtt = {
max_clientid_len = 1024
max_topic_levels = 7
max_qos_allowed = 2
max_topic_alias = 0
strict_mode = false
retain_available = true
wildcard_subscription = false
shared_subscription = false
ignore_loop_deliver = false
}
# 集群配置
cluster = {
name = "emqx_v5_cluster"
autoheal = true
autoclean = 5m
proto_dist = inet_tcp
discovery_strategy = "etcd"
etcd = {
server = "https://etcd.xxxxx:2379"
prefix = "emqx-cluster"
node_ttl = 1m
ssl = {
keyfile = "/data/etcdssl/etcd-key.pem"
cacertfile = "/data/etcdssl/ca.pem"
certfile = "/data/etcdssl/etcd.pem"
enable = true
}
}
}
# 监听配置
listeners.tcp.default = {
bind = "0.0.0.0:1883"
max_connections = 1024000
proxy_protocol = true
proxy_protocol_timeout = 3s
enable_authn = true
acceptors = 64
limiter = {
connection = {
rate = "400/s"
initial = 400
capacity = 1185
burst = "20"
}
client.message_in = {
rate = "100/s"
initial = 100
capacity = 500
}
}
access_rules = ["allow all"]
tcp_options = {
active_n = 100
backlog = 1024
send_timeout = 7s
send_timeout_close = true
nodelay = true
reuseaddr = true
}
}
listeners.tcp.internal = {
bind = "0.0.0.0:38811"
acceptors = 64
max_connections = 102400
proxy_protocol = false
enable_authn = true
limiter.connection = {
rate = "237/s"
burst = "20"
}
tcp_options = {
active_n = 300
backlog = 1024
send_timeout = 3s
send_timeout_close = true
nodelay = true
reuseaddr = true
}
}
listeners.ssl.default = {
bind = "0.0.0.0:8883"
max_connections = 512000
ssl_options {
keyfile = "etc/certs/key.pem"
certfile = "etc/certs/cert.pem"
cacertfile = "etc/certs/cacert.pem"
}
}
listeners.ws.default = {
bind = "0.0.0.0:8083"
max_connections = 1024
acceptors = 8
proxy_protocol = false
websocket = {
mqtt_path = "/mqtt"
proxy_address_header = x-forwarded-for
proxy_port_header = x-forwarded-port
}
limiter.connection = {
rate = "100/s"
burst = "20"
initial = 100
capacity = 500
}
access_rules = ["allow all"]
}
listeners.wss.default = {
bind = "0.0.0.0:8084"
max_connections = 512000
websocket.mqtt_path = "/mqtt"
ssl_options = {
keyfile = "etc/certs/key.pem"
certfile = "etc/certs/cert.pem"
cacertfile = "etc/certs/cacert.pem"
}
}
# 延迟消息
delayed = {
enable = true
max_delayed_messages = 5
}
# 保留消息
retainer = {
enable = true
msg_expiry_interval = 3h
msg_clear_interval = 6h
backend = {
storage_type = ram
max_retained_messages = 5
}
}
# 日志配置
log = {
file_handlers.default = {
enable = true
level = error
file = "log/emqx.log"
sync_mode_qlen = 256
chars_limit = 16384
formatter = text
max_size = 64MB
rotation.count = 10
burst_limit = {
enable = true
max_count = 20480
window_time = 3s
}
overload_kill = {
enable = true
mem_size = 50MB
qlen = 30000
}
}
}
# listeners.quic.default {
# enabled = true
# bind = "0.0.0.0:14567"
# max_connections = 1024000
# ssl_options {
# verify = verify_none
# keyfile = "etc/certs/key.pem"
# certfile = "etc/certs/cert.pem"
# cacertfile = "etc/certs/cacert.pem"
# }
# }
# 控制台配置
dashboard = {
default_username = "admin"
sample_interval = 10s
token_expired_time = 60m
cors = false
i18n_lang = en
listeners.http = {
enable = true
bind = 38080
num_acceptors = 8
# cloud程序38088需要使用这个端口
max_connections = 1024
backlog = 1024
send_timeout = 10s
}
}
# 授权配置
authorization = {
deny_action = ignore
no_match = deny
sources = [
{
type = file
enable = true
path = "etc/acl.conf"
},
{
type = http
method = post
enable = true
url = "http://xxxxxx/auth/EmqxAcl/acl"
request_timeout = 10s
connect_timeout = 10s
enable_pipelining = 237
pool_size = 8
body = {
action = "${action}",
clientid = "${clientid}",
from = "emqx5",
ipaddr = "${peerhost}",
topic = "${topic}",
username = "${username}"
}
headers = {
Content-Type = "application/json"
X-Request-Source = "EMQX"
accept = "application/json"
cache-control = "no-cache"
connection = "keep-alive"
keep-alive = "timeout=30, max=1000"
}
}
]
cache = {
enable = true
max_size = 64
ttl = 15m
}
}
# 认证配置
authentication = [
{
backend = http
method = post
mechanism = password_based
enable = true
request_timeout = 5s
connect_timeout = 10s
enable_pipelining = 100
pool_size = 8
url = "http://xxxx/auth/emqxAuth/auth"
body = {
clientid = "${clientid}"
from = "emqx5"
ipaddr = "${peerhost}"
password = "${password}"
username = "${username}"
}
headers = {
Content-Type = "application/json"
X-Request-Source = "EMQX"
accept = "application/json"
cache-control = "no-cache"
connection = "keep-alive"
keep-alive = "timeout=30, max=1000"
}
}
]
# 系统监控
sysmon = {
os = {
cpu_check_interval = 60s
cpu_high_watermark = 95%
cpu_low_watermark = 90%
mem_check_interval = 60s
sysmem_high_watermark = 80%
procmem_high_watermark = 5%
}
vm = {
long_gc = disabled
long_schedule = 240ms
large_heap = 32MB
busy_port = false
busy_dist_port = true
process_high_watermark = 80%
process_low_watermark = 70%
}
}
# prometheus 配置
prometheus = {
enable = false
push_gateway_server = "http://127.0.0.1:9091"
interval = 15s
}
# Force garbage collection in MQTT connection process after they process certain number of messages or bytes of data.
force_gc = {
enable = true
bytes = 16MB
count = 16000
}
# When the process message queue length, or the memory bytes reaches a certain value, the process is forced to close.
#
# Note: "message queue" here refers to the "message mailbox" of the Erlang process, not the mqueue of QoS 1 and QoS 2.
force_shutdown = {
enable = true
max_message_queue_len = 1000
max_heap_size = 32MB
}
# 事件主题
sys_topics = {
sys_event_messages = {
client_connected = false
client_disconnected = false
}
}
预期行为
成功加入集群
实际行为
集群 cluster status 状态不一致
etcd version 3.5.6,其中两个key没有value
emqx-cluster/ekkacl/lock//24ad88af294e78ea
emqx-cluster/ekkacl/lock//24ad88af294e7cca
emqx-cluster/ekkacl/nodes/emqx-122@10.51.4.153
emqx-122@10.51.4.153
emqx-cluster/ekkacl/nodes/emqx-b055@10.51.3.210
emqx-b055@10.51.3.210
emqx节点错误日志
2023-07-24T06:53:02.189461+00:00 [error] Ekka(AutoCluster): Unlock error: lock_lose
2023-07-24T06:53:02.189683+00:00 [error] Generic server ekka_autocluster terminating. Reason: {{timeout,{gen_server,call,[ekka_cluster_etcd,lock,5000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,247}]},{ekka_autocluster,'-discover_and_join/0-fun-0-',2,[{file,"ekka_autocluster.erl"},{line,167}]},{ekka_autocluster,handle_info,2,[{file,"ekka_autocluster.erl"},{line,116}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}. Last message: loop. State: {s,emqx}.
2023-07-24T06:53:02.190599+00:00 [error] crasher: initial call: ekka_autocluster:init/1, pid: <0.3236.0>, registered_name: ekka_autocluster, exit: {{timeout,{gen_server,call,[ekka_cluster_etcd,lock,5000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,247}]},{ekka_autocluster,'-discover_and_join/0-fun-0-',2,[{file,"ekka_autocluster.erl"},{line,167}]},{ekka_autocluster,handle_info,2,[{file,"ekka_autocluster.erl"},{line,116}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [emqx_machine_sup,<0.1959.0>], message_queue_len: 0, messages: [], links: [], dictionary: [], trap_exit: false, status: running, heap_size: 6772, stack_size: 28, reductions: 28608; neighbours: []
2023-07-24T07:01:35.708771+00:00 [error] Generic server memsup terminating. Reason: {port_died,normal}. Last message: {'EXIT',<0.1896.0>,{port_died,normal}}. State: [{data,[{"Timeout",60000}]},{items,{"Memory Usage",[{"Allocated",593592320},{"Total",8114462720}]}},{items,{"Worst Memory User",[{"Pid",<0.2885.0>},{"Memory",13090016}]}}].
2023-07-24T07:01:35.709351+00:00 [error] crasher: initial call: memsup:init/1, pid: <0.1895.0>, registered_name: memsup, exit: {{port_died,normal},[{gen_server,handle_common_reply,8,[{file,"gen_server.erl"},{line,811}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [os_mon_sup,<0.1893.0>], message_queue_len: 0, messages: [], links: [<0.1894.0>], dictionary: [], trap_exit: true, status: running, heap_size: 2586, stack_size: 28, reductions: 884369; neighbours: []
2023-07-24T07:01:35.709232+00:00 [error] jq port program has died unexpectedly for reason normal (state = #{id => 0, port =>, #Port<0.34>, processed_json_calls =>, 0, restart_period =>, 1000000}) , Trying to restart...
2023-07-24T07:01:35.709349+00:00 [error] jq port program has died unexpectedly for reason normal (state = #{id => 1, port =>, #Port<0.35>, processed_json_calls =>, 0, restart_period =>, 1000000}) , Trying to restart...
2023-07-24T07:01:35.710056+00:00 [error] Supervisor: {local,os_mon_sup}. Context: shutdown_error. Reason: {port_died,normal}. Offender: id=memsup,pid=<0.1895.0>.
2023-07-24T07:01:49.523025+00:00 [error] Ekka(AutoCluster): Unlock error: lock_lose
2023-07-24T07:01:49.523297+00:00 [error] Generic server ekka_autocluster terminating. Reason: {{timeout,{gen_server,call,[ekka_cluster_etcd,lock,5000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,247}]},{ekka_autocluster,'-discover_and_join/0-fun-0-',2,[{file,"ekka_autocluster.erl"},{line,167}]},{ekka_autocluster,handle_info,2,[{file,"ekka_autocluster.erl"},{line,116}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}. Last message: loop. State: {s,emqx}.
2023-07-24T07:01:49.524192+00:00 [error] crasher: initial call: ekka_autocluster:init/1, pid: <0.3197.0>, registered_name: ekka_autocluster, exit: {{timeout,{gen_server,call,[ekka_cluster_etcd,lock,5000]}},[{gen_server,call,3,[{file,"gen_server.erl"},{line,247}]},{ekka_autocluster,'-discover_and_join/0-fun-0-',2,[{file,"ekka_autocluster.erl"},{line,167}]},{ekka_autocluster,handle_info,2,[{file,"ekka_autocluster.erl"},{line,116}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,695}]},{gen_server,handle_msg,6,[{file,"gen_server.erl"},{line,771}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [emqx_machine_sup,<0.1959.0>], message_queue_len: 0, messages: [], links: [], dictionary: [], trap_exit: false, status: running, heap_size: 6772, stack_size: 28, reductions: 28608; neighbours: []
相关截图: