集群后连接发布都变慢

环境

  • EMQX 版本:5.5.1
  • 操作系统版本:ubuntu22.04
    使用的了exhook插件,对数据进行了处理,然后入库

重现此问题的步骤

  1. 两个台服务分别通过docker-compose 部署,部署信息如下
    服务器1:
    emqx:
    image: emqx:5.5.1
    container_name: emqx
    network_mode: “host”
    environment:
    • EMQX_NODE_NAME=nodeA@172.20.0.14

      - “EMQX_LISTENER__TCP__EXTERNAL=172.20.0.14:1883”

    • “EMQX_CLUSTER__DISCOVERY_STRATEGY=static”

    • “EMQX_CLUSTER__STATIC__SEEDS=[nodeA@172.20.0.14,nodeB@172.20.0.201]”

    • “EMQX_LISTENERS__TCP__DEFAULT__BIND=0.0.0.0:1884”

    • “EMQX_DASHBOARD__DEFAULT_PASSWORD=Auto12345@”

    - “EMQX_SETUP__DEFAULT_PASSWORD=Auto12345@”

    - “EMQX_AUTH__DEFAULT_USER__USERNAME=admin”

    - “EMQX_AUTH__DEFAULT_USER__PASSWORD=Auto12345@”

    - “API_KEY=none”

    healthcheck:
    test: [“CMD”, “/opt/emqx/bin/emqx_ctl”, “status”]
    interval: 5s
    timeout: 25s
    retries: 5
    volumes:
    • /data/emqx/data:/opt/emqx/data
    • /data/emqx/log:/opt/emqx/log

服务器2:
image: emqx:5.5.1
container_name: emqx
network_mode: “host”
environment:
- “EMQX_NODE_NAME=nodeB@172.20.0.201
# - “EMQX_LISTENER__TCP__EXTERNAL=172.20.0.14:1883”

- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[nodeA@172.20.0.14,nodeB@172.20.0.201]"
# - "EMQX_SETUP__DEFAULT_PASSWORD=Auto12345@"
# - "EMQX_AUTH__DEFAULT_USER__USERNAME=admin"
# - "EMQX_AUTH__DEFAULT_USER__PASSWORD=Auto12345@"
# - "API_KEY=none"
healthcheck:
  test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
  interval: 5s
  timeout: 25s
  retries: 5
volumes:
  - /data/emqx/data:/opt/emqx/data
  - /data/emqx/etc/emqx.conf:/opt/emqx/etc/emqx.conf

使用nginx 进行负载
stream{

     upstream mqtt_servers {

        server 172.20.0.201:1883;
        server 172.20.0.14:1884;
                                      }

    server {
        listen 1883;
         proxy_pass mqtt_servers;
      }

}

预期行为

之前单节点连接数在1000左右的时候一切都是正常的,现在连接数到2000左右,服务响应就变慢了,变成集群2个节点也不行

实际行为

后台产生的大量的告警

2024-03-23T02:08:34.215174+00:00 [warning] msg: session_stepdown_request_timeout, mfa: emqx_cm:request_stepdown/3(442), action: discard, stale_pid: <0.2005669.0>, stale_channel: [{status,waiting},{message_queue_len,8},{current_stacktrace,[{grpc_client,call,3,[{file,“grpc_client.erl”},{line,757}]},{grpc_client,recv,2,[{file,“grpc_client.erl”},{line,238}]},{grpc_client,unary,4,[{file,“grpc_client.erl”},{line,169}]},{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,cast,3,[{file,“emqx_exhook.erl”},{line,42}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run,2,[{file,“emqx_hooks.erl”},{line,172}]},{emqx_broker,route,2,[{file,“emqx_broker.erl”},{line,289}]},{emqx_broker,publish,1,[{file,“emqx_broker.erl”},{line,246}]},{emqx_channel,do_publish,3,[{file,“emqx_channel.erl”},{line,685}]},{emqx_connection,with_channel,3,[{file,“emqx_connection.erl”},{line,843}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,493}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,499}]},{emqx_connection,handle_recv,3,[{file,“emqx_connection.erl”},{line,455}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}]}]
2024-03-23T02:08:34.576164+00:00 [warning] msg: session_stepdown_request_timeout, mfa: emqx_cm:request_stepdown/3(442), action: discard, stale_pid: <0.1985634.0>, stale_channel: [{status,waiting},{message_queue_len,5},{current_stacktrace,[{grpc_client,call,3,[{file,“grpc_client.erl”},{line,757}]},{grpc_client,recv,2,[{file,“grpc_client.erl”},{line,238}]},{grpc_client,unary,4,[{file,“grpc_client.erl”},{line,169}]},{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,cast,3,[{file,“emqx_exhook.erl”},{line,42}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run_fold,3,[{file,“emqx_hooks.erl”},{line,185}]},{emqx_channel,do_deliver,2,[{file,“emqx_channel.erl”},{line,1086}]},{emqx_channel,‘-do_deliver/2-fun-0-’,2,[{file,“emqx_channel.erl”},{line,1101}]},{lists,foldl,3,[{file,“lists.erl”},{line,1594}]},{emqx_channel,do_deliver,2,[{file,“emqx_channel.erl”},{line,1099}]},{emqx_channel,handle_out,3,[{file,“emqx_channel.erl”},{line,1011}]},{emqx_connection,with_channel,3,[{file,“emqx_connection.erl”},{line,843}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,493}]},{emqx_connection,handle_recv,3,[{file,“emqx_connection.erl”},{line,455}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}]}]
2024-03-23T02:08:34.970137+00:00 [warning] msg: session_stepdown_request_timeout, mfa: emqx_cm:request_stepdown/3(442), action: discard, stale_pid: <0.1998408.0>, stale_channel: [{status,waiting},{message_queue_len,3},{current_stacktrace,[{grpc_client,call,3,[{file,“grpc_client.erl”},{line,757}]},{grpc_client,recv,2,[{file,“grpc_client.erl”},{line,238}]},{grpc_client,unary,4,[{file,“grpc_client.erl”},{line,169}]},{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,cast,3,[{file,“emqx_exhook.erl”},{line,42}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run,2,[{file,“emqx_hooks.erl”},{line,172}]},{emqx_session,terminate,3,[{file,“emqx_session.erl”},{line,511}]},{emqx_connection,terminate,2,[{file,“emqx_connection.erl”},{line,669}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}]}]
2024-03-23T02:08:36.035913+00:00 [warning] msg: session_stepdown_request_timeout, mfa: emqx_cm:request_stepdown/3(442), action: discard, stale_pid: <61893.4531444.0>, stale_channel: [{status,waiting},{message_queue_len,5},{current_stacktrace,[{grpc_client,call,3,[{file,“grpc_client.erl”},{line,757}]},{grpc_client,recv,2,[{file,“grpc_client.erl”},{line,238}]},{grpc_client,unary,4,[{file,“grpc_client.erl”},{line,169}]},{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,cast,3,[{file,“emqx_exhook.erl”},{line,42}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run,2,[{file,“emqx_hooks.erl”},{line,172}]},{emqx_session,terminate,3,[{file,“emqx_session.erl”},{line,511}]},{emqx_connection,terminate,2,[{file,“emqx_connection.erl”},{line,669}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}]}]
2024-03-23T02:08:36.608207+00:00 [warning] msg: session_stepdown_request_timeout, mfa: emqx_cm:request_stepdown/3(442), action: discard, stale_pid: <0.2021717.0>, stale_channel: [{status,waiting},{message_queue_len,3},{current_stacktrace,[{grpc_client,call,3,[{file,“grpc_client.erl”},{line,757}]},{grpc_client,recv,2,[{file,“grpc_client.erl”},{line,238}]},{grpc_client,unary,4,[{file,“grpc_client.erl”},{line,169}]},{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,cast,3,[{file,“emqx_exhook.erl”},{line,42}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run,2,[{file,“emqx_hooks.erl”},{line,172}]},{emqx_session,terminate,3,[{file,“emqx_session.erl”},{line,511}]},{emqx_connection,terminate,2,[{file,“emqx_connection.erl”},{line,669}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}]}]
2024-03-23T02:08:37.993195+00:00 [warning] msg: session_stepdown_request_timeout, mfa: emqx_cm:request_stepdown/3(442), action: discard, stale_pid: <0.1967653.0>, stale_channel: [{status,waiting},{message_queue_len,6},{current_stacktrace,[{grpc_client,call,3,[{file,“grpc_client.erl”},{line,757}]},{grpc_client,recv,2,[{file,“grpc_client.erl”},{line,238}]},{grpc_client,unary,4,[{file,“grpc_client.erl”},{line,169}]},{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,cast,3,[{file,“emqx_exhook.erl”},{line,42}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run,2,[{file,“emqx_hooks.erl”},{line,172}]},{emqx_session,terminate,3,[{file,“emqx_session.erl”},{line,511}]},{emqx_connection,terminate,2,[{file,“emqx_connection.erl”},{line,669}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}]}]
2024-03-23T02:08:38.546913+00:00 [warning] msg: session_stepdown_request_timeout, mfa: emqx_cm:request_stepdown/3(442), action: discard, stale_pid: <61893.4516795.0>, stale_channel: [{status,waiting},{message_queue_len,3},{current_stacktrace,[{grpc_client,call,3,[{file,“grpc_client.erl”},{line,757}]},{grpc_client,recv,2,[{file,“grpc_client.erl”},{line,238}]},{grpc_client,unary,4,[{file,“grpc_client.erl”},{line,169}]},{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,cast,3,[{file,“emqx_exhook.erl”},{line,42}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run,2,[{file,“emqx_hooks.erl”},{line,172}]},{emqx_session,terminate,3,[{file,“emqx_session.erl”},{line,511}]},{emqx_connection,terminate,2,[{file,“emqx_connection.erl”},{line,669}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}]}]
2024-03-23T02:08:39.952751+00:00 [error] supervisor: {esockd_connection_sup,<0.2001203.0>}, errorContext: connection_shutdown, reason: #{hint => malformed_packet,header_type => 7}, offender: [{pid,<0.2001203.0>},{name,connection},{mfargs,{emqx_connection,start_link,[#{listener => {tcp,default},limiter => undefined,zone => default,enable_authn => true}]}}]

麻烦求解决 !!!感谢:pray:

 msg: session_stepdown_request_timeout,...
{current_stacktrace,[{grpc_client,call,3,[{file,“grpc_client.erl”},{line,757}]},{grpc_client,recv,2,[{file,“grpc_client.erl”},{line,238}]},...
{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”}

跟你的外部插件返回太慢相关,你可以把 exhooks 关掉再试试看。

如果确定是这个问题,有几种办法可以尝试:

  • 增大 grpc pool size,可以通过 exhooks 的配置修改。
  • 改进外部插件,让它效率更高些。

grpc pool size 这个具体要怎么设置?? 设置多少合适

通过使用emqtt-bench 建立3000个连接 后台就报错
{“time”:1711226985844378,“level”:“error”,“msg”:“failed_to_execute”,“mfa”:“emqx_hooks:safe_execute/2(215)”,“failed_call”:“{emqx_exhook_handler,on_message_publish,[{message,<<0,6,20,90,20,137,245,197,40,189,0,27,14,241,0,67>>,0,<<"2023021516300031">>,#{dup => false,retain => false},#{protocol => mqtt,username => <<"auto@">>,proto_ver => 4,peerhost => {39,144,14,208},properties => #{}},<<"report">>,<<32,35,2,21,22,48,0,49,255>>,1711226984461,#{}}]}”,“clientid”:“2023021516300031”,“exception”:“error”,“stacktrace”:[“{emqx_exhook_server,do_call,5,[{file,"emqx_exhook_server.erl"},{line,387}]}”,“{emqx_exhook,call_fold,4,[{file,"emqx_exhook.erl"},{line,64}]}”,“{emqx_exhook_handler,on_message_publish,1,[{file,"emqx_exhook_handler.erl"},{line,235}]}”,“{emqx_hooks,safe_execute,2,[{file,"emqx_hooks.erl"},{line,205}]}”,“{emqx_hooks,do_run_fold,3,[{file,"emqx_hooks.erl"},{line,185}]}”,“{emqx_broker,publish,1,[{file,"emqx_broker.erl"},{line,237}]}”,“{emqx_channel,do_publish,3,[{file,"emqx_channel.erl"},{line,685}]}”,“{emqx_connection,with_channel,3,[{file,"emqx_connection.erl"},{line,843}]}”,“{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,493}]}”,“{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,499}]}”,“{emqx_connection,handle_recv,3,[{file,"emqx_connection.erl"},{line,455}]}”,“{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,251}]}”],“peername”:“39.144.14.208:43763”,“reason”:“{case_clause,{‘EXIT’,shutdown}}”,“pid”:“<0.1773297.0>”}

{“time”:1711227113698060,“level”:“error”,“msg”:“supervisor: {esockd_connection_sup,<0.1772581.0>}, errorContext: connection_shutdown, reason: #{hint => malformed_packet,header_type => 7}, offender: [{pid,<0.1772581.0>},{name,connection},{mfargs,{emqx_connection,start_link,[#{listener => {tcp,default},limiter => undefined,zone => default,enable_authn => true}]}}]”,“domain”:[“supervisor_report”],“pid”:“<0.1770603.0>”,“error_logger”:{“type”:“supervisor_report”,“tag”:“error_report”}}
{“time”:1711227152280143,“level”:“error”,“msg”:“supervisor: {esockd_connection_sup,<0.1772694.0>}, errorContext: connection_shutdown, reason: #{hint => malformed_packet,header_type => 7}, offender: [{pid,<0.1772694.0>},{name,connection},{mfargs,{emqx_connection,start_link,[#{listener => {tcp,default},limiter => undefined,zone => default,enable_authn => true}]}}]”,“domain”:[“supervisor_report”],“pid”:“<0.1770603.0>”,“error_logger”:{“type”:“supervisor_report”,“tag”:“error_report”}}
{“time”:1711227196362430,“level”:“error”,“msg”:“supervisor: {esockd_connection_sup,<0.1786728.0>}, errorContext: connection_shutdown, reason: #{hint => malformed_packet,header_type => 7}, offender: [{pid,<0.1786728.0>},{name,connection},{mfargs,{emqx_connection,start_link,[#{listener => {tcp,default},limiter => undefined,zone => default,enable_authn => true}]}}]”,“domain”:[“supervisor_report”],“pid”:“<0.1770603.0>”,“error_logger”:{“type”:“supervisor_report”,“tag”:“error_report”}}
这是什么问题??? 连接数一大 ,exhook 的服务好像就会阻塞???然后客户端就连不上,可以是只是连接的话,我也没有写什么逻辑,应该不会给exhook 的服务很大的压力吧 。然后在emqx 界面中将exhook 关闭后 重新开就好了

现在问题是,打开emqx中exhook 的开关,连接数在1200左右服务都是正常的,publish 都是正常的,然后连接数到1500的时候服务就阻塞了,并且一直维持在1600左右(猜测是exhook 有什么机制一直产生连接数,然后阻塞了服务???),多处理puhlish 的消息就变慢了??我重启我写的插件的服务没有用,所以不是我插件的问题,因为我的插件处理目前的请求是绰绰有余的。然后我在dashboard中把exhook的开关关了再开后,连接数瞬间就降到了1200左右,服务也正常了。所以猜测是eqmx 跟 插件直接连接因为某些情况导致一直产生连接数 阻塞了服务???下面是阻塞的时候的日志,麻烦帮慢看一下2024-03-24T05:46:51.985017+00:00 [error] msg: failed_to_execute, mfa: emqx_hooks:safe_execute/2(215), peername: 39.144.13.86:44184, clientid: 2023072914390089, reason: {case_clause,{‘EXIT’,shutdown}}, stacktrace: [{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,call_fold,4,[{file,“emqx_exhook.erl”},{line,64}]},{emqx_exhook_handler,on_message_publish,1,[{file,“emqx_exhook_handler.erl”},{line,235}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run_fold,3,[{file,“emqx_hooks.erl”},{line,185}]},{emqx_broker,publish,1,[{file,“emqx_broker.erl”},{line,237}]},{emqx_channel,do_publish,3,[{file,“emqx_channel.erl”},{line,685}]},{emqx_connection,with_channel,3,[{file,“emqx_connection.erl”},{line,843}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,493}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,499}]},{emqx_connection,handle_recv,3,[{file,“emqx_connection.erl”},{line,455}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}], exception: error, failed_call: {emqx_exhook_handler,on_message_publish,[{message,<<0,6,20,97,149,96,97,66,40,189,0,139,255,167,3,64>>,0,<<“2023072914390089”>>,#{dup => false,retain => false},#{protocol => mqtt,username => <<“auto@”>>,proto_ver => 4,peerhost => {39,144,13,86},properties => #{}},<<“report”>>,<<32,35,7,41,20,57,0,137,1,51,0,36,255,32,13,45,1,48,2,4,2,48,3,14,4,48,0,115,1,96,0,0,2,96,0,0,4,96,0,0,1,33,0,0,2,33,0,0,219,94>>,1711259210768,#{}}]}
2024-03-24T05:46:51.985236+00:00 [error] msg: failed_to_execute, mfa: emqx_hooks:safe_execute/2(215), peername: 172.20.0.201:46299, reason: {case_clause,{‘EXIT’,shutdown}}, stacktrace: [{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,cast,3,[{file,“emqx_exhook.erl”},{line,42}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run_fold,3,[{file,“emqx_hooks.erl”},{line,185}]},{emqx_channel,run_conn_hooks,2,[{file,“emqx_channel.erl”},{line,2282}]},{emqx_utils,pipeline,3,[{file,“emqx_utils.erl”},{line,184}]},{emqx_channel,handle_in,2,[{file,“emqx_channel.erl”},{line,316}]},{emqx_connection,with_channel,3,[{file,“emqx_connection.erl”},{line,843}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,493}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,499}]},{emqx_connection,handle_recv,3,[{file,“emqx_connection.erl”},{line,455}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}], exception: error, failed_call: {emqx_exhook_handler,on_client_connect,[#{peername => {{172,20,0,201},46299},sockname => {{172,25,0,2},1883},keepalive => 10,peercert => nossl,username => <<“auto@”>>,clientid => <<>>,socktype => tcp,proto_name => <<“MQTT”>>,proto_ver => 4,conn_mod => emqx_connection,clean_start => true,expiry_interval => 0,conn_props => #{},receive_maximum => 32},#{}]}
2024-03-24T05:46:51.985376+00:00 [error] msg: failed_to_execute, mfa: emqx_hooks:safe_execute/2(215), peername: 172.20.0.201:33221, clientid: MzE1NjcxNjA2NDQ2Nzg4ODQ0MTA3NDA1NTA3ODExMDgyMjE, reason: {case_clause,{‘EXIT’,shutdown}}, stacktrace: [{emqx_exhook_server,do_call,5,[{file,“emqx_exhook_server.erl”},{line,387}]},{emqx_exhook,call_fold,4,[{file,“emqx_exhook.erl”},{line,64}]},{emqx_exhook_handler,on_message_publish,1,[{file,“emqx_exhook_handler.erl”},{line,235}]},{emqx_hooks,safe_execute,2,[{file,“emqx_hooks.erl”},{line,205}]},{emqx_hooks,do_run_fold,3,[{file,“emqx_hooks.erl”},{line,185}]},{emqx_broker,publish,1,[{file,“emqx_broker.erl”},{line,237}]},{emqx_channel,do_publish,3,[{file,“emqx_channel.erl”},{line,685}]},{emqx_connection,with_channel,3,[{file,“emqx_connection.erl”},{line,843}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,493}]},{emqx_connection,process_msg,2,[{file,“emqx_connection.erl”},{line,499}]},{emqx_connection,handle_recv,3,[{file,“emqx_connection.erl”},{line,455}]},{proc_lib,wake_up,3,[{file,“proc_lib.erl”},{line,251}]}], exception: error, failed_call: {emqx_exhook_handler,on_message_publish,[{message,<<0,6,20,97,149,95,240,68,40,189,0,168,53,56,0,3>>,0,<<“MzE1NjcxNjA2NDQ2Nzg4ODQ0MTA3NDA1NTA3ODExMDgyMjE”>>,#{dup => false,retain => false},#{protocol => mqtt,username => <<“auto@”>>,proto_ver => 4,peerhost => {172,20,0,201},properties => #{}},<<“control/2022102917050080”>>,<<1,243,0,7,1,48,1,239,2,48,2,118,4,48,0,247,12,48,0,0,13,48,0,173,14,48,0,0,0,0,0,0,118,90>>,1711259210739,#{}}]}