EMQX 静态集群拒绝消费者 (connection congested)

环境

  • EMQX 版本:5.0.21
  • 操作系统版本:cent8

重现此问题的步骤

  1. 静态集群3个节点
  2. 7万个链接,没10秒发一个1k的报文
  3. 显示不断的拒绝消费者,消费者不断重连

2023/5/5 下午12:47:202023-05-05T12:47:20.738129+08:00 [error] Process: <0.28814.182> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6527931, Kill: true, Error Logger: true, Message Queue Len: 54, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,240381},{recent_size,862059},{stack_size,31},{old_heap_size,0},{heap_size,1727318},{bin_vheap_size,156072},{bin_vheap_block_size,272548},{bin_old_vheap_size,0},{bin_old_vheap_block_size,46422}]
2023/5/5 下午12:49:402023-05-05T12:49:40.731327+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<"connection congested: #{buffer => 4096,clientid => <<"tgServerba5936e648a547f591f5b6ca45906b2e">>,conn_state => connected,connected_at => 1683262171437,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 2003552,message_queue_len => 477,peername => <<"10.27.13.251:55848">>,pid => <<"<0.13772.185>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 3988,recv_oct "…>>, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午12:49:402023-05-05T12:49:40.735925+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午12:52:202023-05-05T12:52:20.688041+08:00 [error] Process: <0.16534.189> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6389443, Kill: true, Error Logger: true, Message Queue Len: 100, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,107451},{recent_size,855789},{stack_size,5595},{old_heap_size,0},{heap_size,1721760},{bin_vheap_size,129995},{bin_vheap_block_size,196630},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午12:53:302023-05-05T12:53:30.680767+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<“connection congested: #{buffer => 4096,clientid => <<"tgServerba5936e648a547f591f5b6ca45906b2e">>,conn_state => connected,connected_at => 1683262401395,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 11854872,message_queue_len => 5587,peername => <<"10.27.13.251:57262">>,pid => <<"<0.7942.191>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 10793,recv_oc”…>>, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午12:53:302023-05-05T12:53:30.711445+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午12:56:302023-05-05T12:56:30.775157+08:00 [error] Process: <0.5838.197> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6460537, Kill: true, Error Logger: true, Message Queue Len: 113, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,173089},{recent_size,796953},{stack_size,135},{old_heap_size,0},{heap_size,1727216},{bin_vheap_size,153365},{bin_vheap_block_size,257380},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午12:57:202023-05-05T12:57:20.608973+08:00 [error] Process: <0.19848.199> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6340691, Kill: true, Error Logger: true, Message Queue Len: 1, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,53228},{recent_size,1054238},{stack_size,129},{old_heap_size,0},{heap_size,1727226},{bin_vheap_size,180208},{bin_vheap_block_size,336896},{bin_old_vheap_size,0},{bin_old_vheap_block_size,60763}]
2023/5/5 下午12:58:302023-05-05T12:58:30.753874+08:00 [error] Process: <0.16204.201> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 9345175, Kill: true, Error Logger: true, Message Queue Len: 2, GC Info: [{old_heap_block_size,3581853},{heap_block_size,5472277},{mbuf_size,291185},{recent_size,1189382},{stack_size,141},{old_heap_size,0},{heap_size,2487259},{bin_vheap_size,230021},{bin_vheap_block_size,466969},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午12:58:502023-05-05T12:58:50.414310+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<“connection congested: #{buffer => 4096,clientid => <<"tgServerba5936e648a547f591f5b6ca45906b2e">>,conn_state => connected,connected_at => 1683262721401,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 15705400,message_queue_len => 10241,peername => <<"10.27.13.251:59288">>,pid => <<"<0.29649.201>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 18341,recv_”…>>, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午12:58:502023-05-05T12:58:50.717625+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午1:00:302023-05-05T13:00:30.712380+08:00 [error] Process: <0.17024.205> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6399876, Kill: true, Error Logger: true, Message Queue Len: 0, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,116378},{recent_size,916150},{stack_size,4089},{old_heap_size,0},{heap_size,1723266},{bin_vheap_size,142880},{bin_vheap_block_size,257380},{bin_old_vheap_size,0},{bin_old_vheap_block_size,46422}]
2023/5/5 下午1:02:102023-05-05T13:02:10.550305+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<“connection congested: #{buffer => 4096,clientid => <<"tgServer4127d820c2cb44adab076e74f9451f23">>,conn_state => connected,connected_at => 1683262921207,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 14652280,message_queue_len => 7825,peername => <<"10.27.13.251:60528">>,pid => <<"<0.8253.208>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 9332,recv_oct”…>>, name: <<“conn_congestion/tgServer4127d820c2cb44adab076e74f9451f23/testUser”>>
2023/5/5 下午1:02:102023-05-05T13:02:10.566147+08:00 [error] Process: <0.8253.208> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6392887, Kill: true, Error Logger: true, Message Queue Len: 88, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,105363},{recent_size,808621},{stack_size,71},{old_heap_size,0},{heap_size,1727292},{bin_vheap_size,114246},{bin_vheap_block_size,196630},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午1:02:142023-05-05T13:02:14.306043+08:00 [error] supervisor: ‘esockd_connection_sup - <0.2356.0>’, errorContext: connection_shutdown, reason: #{header_type => 0,hint => zero_remaining_len}, offender: [{pid,<0.19844.208>},{name,connection},{mfargs,{emqx_connection,start_link,[#{enable_authn => true,limiter => #{bytes_in => #{capacity => 1099511627776,initial => 0,rate => infinity},client => #{bytes_in => #{capacity => 1099511627776,divisible => false,failure_strategy => force,initial => 0,low_watermark => 0,max_retry_time => 10000,rate => infinity},connection => #{capacity => 1099511627776,divisible => false,failure_strategy => force,initial => 0,low_watermark => 0,max_retry_time => 10000,rate => infinity},message_in => #{capacity => 1099511627776,divisible => false,failure_strategy => force,initial => 0,low_watermark => 0,max_retry_time => 10000,rate => infinity},message_routing => #{capacity => 1099511627776,divisible => false,failure_strategy => force,initial => 0,low_watermark => 0,max_retry_time => 10000,rate => infinity}},connection => #{capacity => 1000,initial => 0,rate => 100.0},message_in => #{capacity => 1099511627776,initial => 0,rate => infinity},message_routing => #{capacity => 1099511627776,initial => 0,rate => infinity}},listener => {tcp,default},zone => default}]}}]
2023/5/5 下午1:02:302023-05-05T13:02:30.022254+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<“connection congested: #{buffer => 4096,clientid => <<"tgServerba5936e648a547f591f5b6ca45906b2e">>,conn_state => connected,connected_at => 1683262941103,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 1122200,message_queue_len => 74,peername => <<"10.27.13.251:60664">>,pid => <<"<0.19934.208>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 5509,recv_oct =”…>>, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午1:02:302023-05-05T13:02:30.242954+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午1:03:402023-05-05T13:03:40.771032+08:00 [error] Process: <0.24737.210> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6545582, Kill: true, Error Logger: true, Message Queue Len: 226, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,258082},{recent_size,833020},{stack_size,97},{old_heap_size,0},{heap_size,1727250},{bin_vheap_size,161706},{bin_vheap_block_size,318147},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午1:04:002023-05-05T13:04:00.788651+08:00 [error] Process: <0.2988.211> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6518723, Kill: true, Error Logger: true, Message Queue Len: 1, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,231154},{recent_size,775611},{stack_size,19},{old_heap_size,0},{heap_size,1727335},{bin_vheap_size,149343},{bin_vheap_block_size,208218},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午1:06:302023-05-05T13:06:30.639104+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<"connection congested: #{buffer => 4096,clientid => <<"tgServerdb3248683eec43fab341370ddf833fa3">>,conn_state => connected,connected_at => 1683263181388,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 4844976,message_queue_len => 1001,peername => <<"10.27.13.251:33918">>,pid => <<"<0.30798.214>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 560,recv_oct "…>>, name: <<“conn_congestion/tgServerdb3248683eec43fab341370ddf833fa3/testUser”>>
2023/5/5 下午1:06:302023-05-05T13:06:30.662950+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerdb3248683eec43fab341370ddf833fa3/testUser”>>
2023/5/5 下午1:07:002023-05-05T13:07:00.777437+08:00 [error] Process: <0.22049.215> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6393500, Kill: true, Error Logger: true, Message Queue Len: 0, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,110362},{recent_size,878074},{stack_size,4449},{old_heap_size,0},{heap_size,1722906},{bin_vheap_size,132778},{bin_vheap_block_size,257380},{bin_old_vheap_size,0},{bin_old_vheap_block_size,46422}]
2023/5/5 下午1:09:402023-05-05T13:09:40.817698+08:00 [error] Process: <0.21995.219> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6368070, Kill: true, Error Logger: true, Message Queue Len: 15, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,80578},{recent_size,854719},{stack_size,91},{old_heap_size,0},{heap_size,1727260},{bin_vheap_size,143501},{bin_vheap_block_size,257380},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午1:12:402023-05-05T13:12:40.737367+08:00 [error] Process: <0.20098.223> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6560937, Kill: true, Error Logger: true, Message Queue Len: 376, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,273416},{recent_size,1285431},{stack_size,77},{old_heap_size,0},{heap_size,1727277},{bin_vheap_size,215057},{bin_vheap_block_size,336896},{bin_old_vheap_size,0},{bin_old_vheap_block_size,121527}]
2023/5/5 下午1:13:102023-05-05T13:13:10.043872+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<"connection congested: #{buffer => 4096,clientid => <<"tgServerba5936e648a547f591f5b6ca45906b2e">>,conn_state => connected,connected_at => 1683263581708,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 1811232,message_queue_len => 2,peername => <<"10.27.13.25:42320">>,pid => <<"<0.25411.224>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 1895,recv_oct => "…>>, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午1:13:102023-05-05T13:13:10.635695+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午1:14:002023-05-05T13:14:00.676129+08:00 [error] Process: <0.7281.226> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6407498, Kill: true, Error Logger: true, Message Queue Len: 124, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,119936},{recent_size,967548},{stack_size,25},{old_heap_size,0},{heap_size,1727325},{bin_vheap_size,173521},{bin_vheap_block_size,272548},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午1:14:302023-05-05T13:14:30.692176+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServer4127d820c2cb44adab076e74f9451f23/testUser”>>
2023/5/5 下午1:17:102023-05-05T13:17:10.300972+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<“connection congested: #{buffer => 4096,clientid => <<"tgServerba5936e648a547f591f5b6ca45906b2e">>,conn_state => connected,connected_at => 1683263821135,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 1117536,message_queue_len => 21,peername => <<"10.27.13.25:43976">>,pid => <<"<0.7658.231>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 14869,recv_oct =>”…>>, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午1:17:102023-05-05T13:17:10.642856+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerba5936e648a547f591f5b6ca45906b2e/testUser”>>
2023/5/5 下午1:17:202023-05-05T13:17:20.764914+08:00 [error] Process: <0.23956.231> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6479746, Kill: true, Error Logger: true, Message Queue Len: 257, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,196171},{recent_size,944422},{stack_size,4011},{old_heap_size,0},{heap_size,1723343},{bin_vheap_size,160658},{bin_vheap_block_size,336896},{bin_old_vheap_size,0},{bin_old_vheap_block_size,46422}]
2023/5/5 下午1:17:302023-05-05T13:17:30.749204+08:00 [error] Process: <0.11440.232> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 7794757, Kill: true, Error Logger: true, Message Queue Len: 128, GC Info: [{old_heap_block_size,2984878},{heap_block_size,4560232},{mbuf_size,249680},{recent_size,984909},{stack_size,28},{old_heap_size,0},{heap_size,2072794},{bin_vheap_size,189255},{bin_vheap_block_size,257380},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]
2023/5/5 下午1:18:402023-05-05T13:18:40.512008+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<“connection congested: #{buffer => 4096,clientid => <<"tgServerdb3248683eec43fab341370ddf833fa3">>,conn_state => connected,connected_at => 1683263911281,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 1126248,message_queue_len => 120,peername => <<"10.27.13.251:38356">>,pid => <<"<0.6349.234>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 1379,recv_oct =”…>>, name: <<“conn_congestion/tgServerdb3248683eec43fab341370ddf833fa3/testUser”>>
2023/5/5 下午1:18:402023-05-05T13:18:40.648214+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerdb3248683eec43fab341370ddf833fa3/testUser”>>
2023/5/5 下午1:19:302023-05-05T13:19:30.708177+08:00 [error] Process: <0.6743.235> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6335301, Kill: true, Error Logger: true, Message Queue Len: 10, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,47817},{recent_size,902518},{stack_size,111},{old_heap_size,0},{heap_size,1727252},{bin_vheap_size,144090},{bin_vheap_block_size,196630},{bin_old_vheap_size,0},{bin_old_vheap_block_size,121527}]
2023/5/5 下午1:20:252023-05-05T13:20:25.348810+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<"connection congested: #{buffer => 4096,clientid => <<"tgServer4127d820c2cb44adab076e74f9451f23">>,conn_state => connected,connected_at => 1683264021227,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 143392,message_queue_len => 0,peername => <<"10.27.13.25:45336">>,pid => <<"<0.12441.237>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 10310,recv_oct => "…>>, name: <<“conn_congestion/tgServer4127d820c2cb44adab076e74f9451f23/testUser”>>
2023/5/5 下午1:20:302023-05-05T13:20:30.659010+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServer4127d820c2cb44adab076e74f9451f23/testUser”>>
2023/5/5 下午1:28:002023-05-05T13:28:00.689468+08:00 [error] Process: <0.2079.239> on node ‘emqx@10.27.13.91’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6595900, Kill: true, Error Logger: true, Message Queue Len: 348, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,308374},{recent_size,1250675},{stack_size,47},{old_heap_size,0},{heap_size,1727294},{bin_vheap_size,203022},{bin_vheap_block_size,377781},{bin_old_vheap_size,0},{bin_old_vheap_block_size,46422}]
2023/5/5 下午1:31:102023-05-05T13:31:10.263515+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<"connection congested: #{buffer => 4096,clientid => <<"tgServerda6e201b00b84b9b84efdd29798e8616">>,conn_state => connected,connected_at => 1683264661481,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 1949296,message_queue_len => 36,peername => <<"10.27.13.251:42610">>,pid => <<"<0.9214.244>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 608,recv_oct => "…>>, name: <<“conn_congestion/tgServerda6e201b00b84b9b84efdd29798e8616/testUser”>>
2023/5/5 下午1:31:102023-05-05T13:31:10.668839+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServerda6e201b00b84b9b84efdd29798e8616/testUser”>>
2023/5/5 下午1:32:202023-05-05T13:32:20.185047+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<“connection congested: #{buffer => 4096,clientid => <<"tgServer9896b7354e57471d84bc40abb153ca3e">>,conn_state => connected,connected_at => 1683264731591,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 2968704,message_queue_len => 549,peername => <<"10.27.13.251:43050">>,pid => <<"<0.606.246>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 11376,recv_oct =”…>>, name: <<“conn_congestion/tgServer9896b7354e57471d84bc40abb153ca3e/testUser”>>
2023/5/5 下午1:32:202023-05-05T13:32:20.743350+08:00 [warning] msg: alarm_is_deactivated, mfa: emqx_alarm:do_actions/3, line: 422, name: <<“conn_congestion/tgServer9896b7354e57471d84bc40abb153ca3e/testUser”>>
2023/5/5 下午1:33:202023-05-05T13:33:20.638662+08:00 [warning] msg: alarm_is_activated, mfa: emqx_alarm:do_actions/3, line: 416, message: <<“connection congested: #{buffer => 4096,clientid => <<"tgServer9896b7354e57471d84bc40abb153ca3e">>,conn_state => connected,connected_at => 1683264791358,high_msgq_watermark => 8192,high_watermark => 1048576,memory => 13877128,message_queue_len => 5462,peername => <<"10.27.13.25:50410">>,pid => <<"<0.30125.247>">>,proto_name => <<"MQTT">>,proto_ver => 4,recbuf => 369280,recv_cnt => 6000,recv_oct”…>>, name: <<“conn_congestion/tgServer9896b7354e57471d84bc40abb153ca3e/testUser”>>

预期行为

实际行为

我想问一下,这个日志意味着什么?为什么EMQX在我消费不够的时候,不能像kafka 一样将消息积压,而不是像现在踢出消费者

conn_congestion 告警意味着你的消费端压力太大了来不及处理;目前版本建议使用共享订阅增加消费端个数来提高消费能力

我已经使用了共享订阅,起了三个个消费者。测试扩展到6个消费者,任然不能解决问题。在社区看到,共享订阅有负载会导致消费下降,与你说的相悖。
后面我增加消费者的多线程队列长度,由500改为2000,目前问题得以解决。

我的测试消息目前都是QOS2 的

如果消息量特别大,建议上企业版的 Bridge Kafka。用来处理生产和消费速度不匹配的问题