关于多个Exhook,执行结果的疑问

EMQX: v5.0.8

我有两个Exhook Server

image

分别注册了五个钩子

其中第一个Exhook Server工作正常,返回ok, 没有stop,第二个ExHook Server访问超时。
在这种情况下,对于客户端连接,订阅,发布消息取决于哪个Exhook的结果

第二个ExHook出现异常时,EMQX的错误日志:

[error] clientid: xxx, function: on_message_publish, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"emqx-bridge-staging">>,failed_action => ignore,key_dispatch => <<"rule_acl_test_czw">>,timeout => 15000}, peername: xx.xx.xxx.xxx:17459, reason: {deadline_exceeded,<<"Waiting for response timeout">>}, req: #{message => #{from => <<"rule_acl_test_czw">>,headers => #{},id => <<"0006053E3104353C53AAF711795E042F">>,node => <<"emqx@10.6.0.11">>,payload => <<"{\"username\":\"xxx\",\"topic\":\"/xxx/xxx/xxx/xxx\",\"timestamp\":1694614528734,\"result\":\"allow\",\"peerhost\":\"xxx.xx.xxx.xxx\",\"node\":\"emqx@10.6.0.11\",\"metadata\":{\"rule_id\":\"rule_acl_test_czw\"},\"event\":\"client.check_authz_complete\",\"clientid\":\"xxx\",\"authz_source\":\"exhook\",\"action\":\"publish\"}">>,qos => 1,timestamp => 1694614528734,topic => <<"/xxx/xxx">>},meta => #{cluster_name => "emqxcl",node => <<"emqx@10.6.0.11">>,sysdescr => "EMQX",version => "5.0.8"}}

[warning] action: discard, line: 480, mfa: emqx_cm:request_stepdown/3, msg: session_stepdown_request_timeout, pid: <0.25979.3399>, stale_channel: [{status,waiting},{message_queue_len,3},{current_stacktrace,[{grpc_client,call,3,[{file,"grpc_client.erl"},{line,730}]},{grpc_client,recv,2,[{file,"grpc_client.erl"},{line,237}]},{grpc_client,unary,4,[{file,"grpc_client.erl"},{line,168}]},{emqx_exhook_server,do_call,5,[{file,"emqx_exhook_server.erl"},{line,387}]},{emqx_exhook,cast,3,[{file,"emqx_exhook.erl"},{line,42}]},{emqx_hooks,safe_execute,2,[{file,"emqx_hooks.erl"},{line,200}]},{emqx_hooks,do_run,2,[{file,"emqx_hooks.erl"},{line,167}]},{emqx_channel,ensure_connected,1,[{file,"emqx_channel.erl"},{line,2137}]},{emqx_channel,process_connect,2,[{file,"emqx_channel.erl"},{line,616}]},{emqx_connection,with_channel,3,[{file,"emqx_connection.erl"},{line,790}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,466}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,472}]},{emqx_connection,handle_recv,3,[{file,"emqx_connection.erl"},{line,428}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]}]

[warning] action: {takeover,'begin'}, clientid: xxx, line: 480, mfa: emqx_cm:request_stepdown/3, msg: session_stepdown_request_timeout, peername: 192.168.0.189:51600, pid: <0.25469.3399>, stale_channel: [{status,waiting},{message_queue_len,2},{current_stacktrace,[{grpc_client,call,3,[{file,"grpc_client.erl"},{line,730}]},{grpc_client,recv,2,[{file,"grpc_client.erl"},{line,237}]},{grpc_client,unary,4,[{file,"grpc_client.erl"},{line,168}]},{emqx_exhook_server,do_call,5,[{file,"emqx_exhook_server.erl"},{line,387}]},{emqx_exhook,cast,3,[{file,"emqx_exhook.erl"},{line,42}]},{emqx_hooks,safe_execute,2,[{file,"emqx_hooks.erl"},{line,200}]},{emqx_hooks,do_run,2,[{file,"emqx_hooks.erl"},{line,167}]},{emqx_channel,ensure_disconnected,2,[{file,"emqx_channel.erl"},{line,2137}]},{emqx_channel,handle_info,2,[{file,"emqx_channel.erl"},{line,1261}]},{emqx_connection,with_channel,3,[{file,"emqx_connection.erl"},{line,790}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,466}]},{emqx_connection,handle_recv,3,[{file,"emqx_connection.erl"},{line,428}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,236}]}]}]

异常的这个结果会被忽略掉,直接使用正常的那个结果做为最终结果。

但是我的测试结果却是因为第二个ExHook访问超时,导致整条链路都阻塞了

两个ExHook请求超时都是5s, 客户端重连间隔为15s,

测试过程:

  1. 第一个Exhook正常
  2. 第二个Exhook打断点,让它访问超时

结果:
EMQX 并没有返回CONNACK,客户端不停地在重连

最新的v5.2.0也有这样的问题

看起来是不应该阻塞整个链接才对。

方便开下 emqx 的debug日志,然后把这段日志发出来我们分析下么?

已发私信,麻烦查看一下,谢谢:pray:

ping

昨天又出现这个问题了,能帮忙看下原因吗?谢谢:pray:

2023-09-15T14:36:29.301736+08:00 [error] msg: exhook_call_error, mfa: emqx_exhook_server:do_call/5, line: 412, peername: 192.168.1.111:56200, clientid: mqttx_593afecb, function: on_message_dropped, module: emqx_exhook_v_2_hook_provider_client, options: #{channel => <<"local">>,failed_action => deny,key_dispatch => <<"rule_connected_disconnected">>,timeout => 5000}, reason: {deadline_exceeded,<<"Waiting for response timeout">>}, req: #{message => #{from => <<"rule_connected_disconnected">>,headers => #{},id => <<"00060560029FDDF903A10400359B0002">>,node => <<"emqx@192.168.1.241">>,payload => <<"{\"username\":\"mqttx_593afecb\",\"timestamp\":1694759779294,\"peername\":\"192.168.1.111:56200\",\"event_type\":\"undefined\",\"event\":\"client.connected\",\"connected_at\":1694759779294}">>,qos => 1,timestamp => 1694759779294,topic => <<"events/device">>},meta => #{cluster_name => "emqxcl",node => <<"emqx@192.168.1.241">>,sysdescr => "EMQX",version => "5.2.0"},reason => <<"no_subscribers">>}
2

抱歉,错过了你的消息。看日志里是有很多请求超时的错误,意思是 EMQX 的 ExHook 向你的 gRPC 服务发送请求后,等待一段时间(默认我记得是 5s)后没有回复,则认为超时了。

  1. 这5个注册的钩子都是需要的么?
  2. 预计的到你的 gRPC 服务的 TPS 大概在多少量级?检查是否有突发流量和你的对端的 grpc 服务是否也有错误日志或者阻塞的情况出现
  1. 五个钩子都需要的,超时我因为我本地调试打断点,模拟线上环境第二个Exhook阻塞的情况
  2. 本地调试谈不上多少量级,没有突发流量。

问题就是 有两个Exhook,第一个优先级最高,如果第二个Exhook超时阻塞了,整条链路也会阻塞。

现象就是:发送CONNECT报文,第一个Exhook全部执行完,有日志记录,第二个Exhook访问超时,会导致EMQX无法正常响应CONNACK,客户端无法连接服务,进行重连操作

我的疑惑是,不是访问超时,我不明白的是,为什么第一个Exhook的执行结果,会受到后续Exhook的影响

明明第一个Exhook五个钩子全部执行成功了,却因为第二个Exhook超时,导致EMQX无法正常响应

看了下代码应该是:
connect 报文过来,你有多个exhook是顺序执行的。
第一个执行成功后,顺序到第二个,超时会忽略他的结果,他会接着到第三个,等所有的的hook执行完成后再决定返回connack,
关键就是所有的exhook是顺序执行,所以你的客户端会没收到connack,然后他又重试了,没来得级等到最终的connack。

实际情况是确实没有返回CONNACK

我的场景:
ExHook超时15s,客户端因为超时没有收到CONNACK,发起多个CONNECT,那么在这段时间内,如果EMQX有响应,那么至少在wireshark会有一次CONNACK报文吧(见上图)

最好把tcp的包显示出来,方便判断这个connect后面如果是到底是谁先断开的连接。

客户端先断开


也就是说客户端确实等了15秒,还是没有收到connack,然后就自己断开重连了。
但是根据上面的信息。根据提供的emqx 日志 一个exhookl默认是5秒,就会超时。
是不是有多个emqx都超时了?
麻烦对应一下:emqx里面的这条日志的时间。

[error] msg: exhook_call_error,

和tcp包的时间对比看看。想明确一下到底是多少时长才在emqx内超时的。

或者你把客户端的超时设置成100s,这样看看能不能收到connack,及收到的时间是多长。

EMQX LOG

2023-10-23T15:28:55.605369+08:00 [error] msg: exhook_call_error, mfa: emqx_exhook_server:do_call/5(412), peername: 192.168.1.111:58638, clientid: mqttx_593afecb, function: on_message_publish, module: emqx_exhook_v_2_hook_provider_client, options: #{channel => <<"local">>,failed_action => deny,key_dispatch => <<"rule_connected_disconnected">>,timeout => 15000}, reason: {deadline_exceeded,<<"Waiting for response timeout">>}, req: #{message => #{from => <<"rule_connected_disconnected">>,headers => #{},id => <<"0006085D2BD4A9B603A17E0075090002">>,node => <<"emqx@192.168.1.241">>,payload => <<"{\"username\":\"mqttx_593afecb\",\"timestamp\":1698046120601,\"peername\":\"192.168.1.111:58638\",\"event_type\":\"undefined\",\"event\":\"client.connected\",\"connected_at\":1698046120601}">>,qos => 1,timestamp => 1698046120602,topic => <<"events/device">>},meta => #{cluster_name => "emqxcl",node => <<"emqx@192.168.1.241">>,sysdescr => "EMQX",version => "5.3.0"}}
2023-10-23T15:29:10.608425+08:00 [error] msg: exhook_call_error, mfa: emqx_exhook_server:do_call/5(412), peername: 192.168.1.111:58638, clientid: mqttx_593afecb, function: on_message_dropped, module: emqx_exhook_v_2_hook_provider_client, options: #{channel => <<"local">>,failed_action => deny,key_dispatch => <<"rule_connected_disconnected">>,timeout => 15000}, reason: {deadline_exceeded,<<"Waiting for response timeout">>}, req: #{message => #{from => <<"rule_connected_disconnected">>,headers => #{},id => <<"0006085D2BD4A9B603A17E0075090002">>,node => <<"emqx@192.168.1.241">>,payload => <<"{\"username\":\"mqttx_593afecb\",\"timestamp\":1698046120601,\"peername\":\"192.168.1.111:58638\",\"event_type\":\"undefined\",\"event\":\"client.connected\",\"connected_at\":1698046120601}">>,qos => 1,timestamp => 1698046120602,topic => <<"events/device">>},meta => #{cluster_name => "emqxcl",node => <<"emqx@192.168.1.241">>,sysdescr => "EMQX",version => "5.3.0"},reason => <<"no_subscribers">>}
2023-10-23T15:29:25.612439+08:00 [error] msg: exhook_call_error, mfa: emqx_exhook_server:do_call/5(412), peername: 192.168.1.111:58638, clientid: mqttx_593afecb, function: on_client_connected, module: emqx_exhook_v_2_hook_provider_client, options: #{channel => <<"local">>,failed_action => deny,key_dispatch => <<"mqttx_593afecb">>,timeout => 15000}, reason: {deadline_exceeded,<<"Waiting for response timeout">>}, req: #{clientinfo => #{anonymous => true,clientid => <<"mqttx_593afecb">>,cn => <<>>,dn => <<>>,is_superuser => false,mountpoint => <<>>,node => <<"emqx@192.168.1.241">>,password => <<>>,peerhost => <<"192.168.1.111">>,protocol => <<"mqtt">>,sockport => 1883,username => <<"mqttx_593afecb">>},meta => #{cluster_name => "emqxcl",node => <<"emqx@192.168.1.241">>,sysdescr => "EMQX",version => "5.3.0"}}
2023-10-23T15:29:25.612950+08:00 [error] crasher: initial call: emqx_connection:init/4, pid: <0.29961.126>, registered_name: [], error: {{case_clause,{error,einval}},[{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,493}]},{emqx_connection,process_msg,2,[{file,"emqx_connection.erl"},{line,499}]},{emqx_connection,handle_recv,3,[{file,"emqx_connection.erl"},{line,455}]},{proc_lib,wake_up,3,[{file,"proc_lib.erl"},{line,250}]}]}, ancestors: [<0.2895.0>,<0.2894.0>,esockd_sup,<0.2693.0>], message_queue_len: 2, messages: [{tcp_closed,#Port<0.568218>},{timeout,#Ref<0.547097680.2997878788.118371>,emit_stats}], links: [<0.2895.0>], dictionary: [{incoming_bytes,50},{'$logger_metadata$',#{clientid => <<"mqttx_593afecb">>,peername => "192.168.1.111:58638"}},{recv_pkt,1},{guid,{1698046120602038,3992138577161,2}}], trap_exit: false, status: running, heap_size: 6772, stack_size: 28, reductions: 34470; neighbours:

28分35秒开始CONNECT,EMQX的LOG是28分55秒

TCP: (客户端已设置100s超时重试)

最后一条有emqx connection crash的日志

再补充一下:
我有三个EMQX节点,客户端指定了其中的一个节点连接

超时的问题,我大概知道了,但还是有个问题不明白

线上环境的Exhook部署在k8s上,Service为LB类型,通过nodePort形式暴露(问题之一)。当驱逐节点时,驱逐节点的端口从LB上摘除。gRPC因为长连接的原因,将流量转发到已被驱逐的节点端口上,此时的端口无法访问,所以响应超时。

但是,EMQX的Exhook不是有自动重连机制,15s超时,60s重连间隔,按道理也不会影响

看起来,是有点问题。
麻烦提供一下:

./bin/emqx eval "emqx_exhook:get_basic_usage_info()."
./bin/emqx eval "emqx_exhook_mgr:list()."

这2个命令的结果。我需要在本地重现一下::pray:

[root@TENCENT64 emqx]# emqx eval "emqx_exhook:get_basic_usage_info()."
#{num_servers => 1,
  servers =>
      [#{driver => grpc,
         hooks =>
             ['message.publish','message.dropped','client.disconnected',
              'client.connected','client.authorize']}]}
[root@TENCENT64 emqx]# emqx eval "emqx_exhook_mgr:list()."

[#{auto_reconnect => 30000,enable => true,failed_action => ignore,
   hooks =>
       [#{name => 'message.publish',params => #{topics => []}},
        #{name => 'message.dropped',params => #{topics => []}},
        #{name => 'client.disconnected',params => #{}},
        #{name => 'client.connected',params => #{}},
        #{name => 'client.authorize',params => #{}}],
   name => <<"emqx-bridge-dev">>,order => 1,pool_size => 32,
   request_timeout => 15000,
   socket_options => #{keepalive => true,nodelay => true},
   ssl =>
       #{ciphers => [],depth => 10,enable => false,hibernate_after => 5000,
         log_level => notice,reuse_sessions => true,
         secure_renegotiate => true,verify => verify_none,
         versions => ['tlsv1.3','tlsv1.2']},
   status => connected,timer => #Ref<13864.547097680.2977169409.192631>,
   url => <<"https://examle.com:443/">>},
 #{auto_reconnect => 60000,enable => false,failed_action => deny,hooks => [],
   name => <<"local">>,order => 2,pool_size => 16,request_timeout => 15000,
   socket_options => #{keepalive => true,nodelay => true},
   ssl =>
       #{ciphers => [],depth => 10,enable => false,hibernate_after => 5000,
         log_level => notice,reuse_sessions => true,
         secure_renegotiate => true,verify => verify_peer,
         versions => ['tlsv1.3','tlsv1.2']},
   status => disabled,timer => undefined,
   url => <<"http://192.168.1.111:8085">>}]

第一个Exhook的连接地址,涉及相关域名已隐藏

好的,感谢。我明天重现一下:

  1. 为什么超时这么长时间没有回connack
  2. 为什么自动重连没有起做作用。
1 个赞