数据量较大时消费端会掉线并且重连emq会失败

环境信息

  • EMQ X 版本:4.2
  • 操作系统及版本:centos 7.9
  • 其他

问题描述

1600个生产者以每秒10k数据速度发送数据,消费(一个)过几个小时后会掉线,并且重连会失败

配置文件及日志

2021-10-22 01:12:45.018 [warning] <<"ceshi-test-2">>@192.168.10.11:36210 [Session] Dropped msg due to mqueue is full: Message(Id= Îßþ>#ÆÿK( Ïl^, QoS=2, Topic=eap/test6TCQ1BQN/2P5297281Y/property/post, From=<<"thing-test6TCQ1BQN">>, Flags=[], Headers=#{peerhost => {192,168,10,11},
  properties => #{},proto_ver => 5,protocol => mqtt,
  username => <<"test6TCQ1BQN">>})
2021-10-22 01:12:45.019 [warning] <<"ceshi-test-2">>@192.168.10.11:36210 [Session] Dropped msg due to mqueue is full: Message(Id= Îßþ?Šþ- SnÈ, QoS=2, Topic=eap/testN7X1Q5XY/PJ7E48NRJ9/property/post, From=<<"thing-testN7X1Q5XY">>, Flags=[], Headers=#{peerhost => {192,168,10,11},
  properties => #{},proto_ver => 5,protocol => mqtt,
  username => <<"testN7X1Q5XY">>})
2021-10-22 01:12:45.019 [warning] <<"ceshi-test-2">>@192.168.10.11:36210 [Session] Dropped msg due to mqueue is full: Message(Id= Îßþ?‹Œ- M
n, QoS=2, Topic=eap/testRS16C69I/XT8V4M59NF/property/post, From=<<"thing-testRS16C69I">>, Flags=[], Headers=#{peerhost => {192,168,10,11},
  properties => #{},proto_ver => 5,protocol => mqtt,
  username => <<"testRS16C69I">>})
2021-10-22 01:12:45.568 [warning] Received gun_down with closed
2021-10-22 01:12:46.676 [warning] Received gun_down with closed
2021-10-22 01:12:46.719 [warning] Received gun_down with closed
2021-10-22 01:12:46.943 [warning] Received gun_down with closed
2021-10-22 01:12:47.518 [warning] Received gun_down with closed
2021-10-22 01:12:47.540 [warning] Received gun_down with closed
2021-10-22 01:12:47.573 [warning] Received gun_down with closed
2021-10-22 01:12:47.635 [warning] Received gun_down with closed
2021-10-22 01:12:47.781 [warning] Received gun_down with closed
2021-10-22 01:12:49.837 [warning] Received gun_down with closed
2021-10-22 01:12:50.022 [warning] Received gun_down with closed
2021-10-22 01:12:50.489 [warning] Received gun_down with closed
2021-10-22 01:12:50.527 [warning] Received gun_down with closed
2021-10-22 01:12:50.576 [warning] Received gun_down with closed
2021-10-22 01:12:50.759 [warning] Received gun_down with closed
2021-10-22 01:12:50.870 [error] <<"ceshi-test-2">>@192.168.10.11:52478   crasher:
    initial call: emqx_connection:init/4
    pid: <0.21252.52>
    registered_name: []
    exception exit: {timeout,
                        {gen_server,call,
                            [emqx_shared_sub,
                             {subscribe,<<"ceshi">>,
                                 <<"$SYS/brokers/+/clients/+/disconnected">>,
                                 <0.21252.52>}]}}
      in function  emqx_connection:terminate/2 (emqx_connection.erl, line 430)
    ancestors: [<0.1811.0>,<0.1810.0>,esockd_sup,<0.1398.0>]
    message_queue_len: 0
    messages: []
    links: [<0.1811.0>]
    dictionary: [{acl_cache_size,1},
                  {acl_keys_q,
                      {[{subscribe,
                            <<"$SYS/brokers/+/clients/+/disconnected">>}],
                       []}},
                  {send_pkt,1},
                  {guid,{1634836365593211,37968383333124,0}},
                  {incoming_bytes,107},
                  {recv_pkt,2},
                  {{subscribe,<<"$SYS/brokers/+/clients/+/disconnected">>},
                   {allow,1634836365869}},
                  {outgoing_bytes,21},
                  {'$logger_metadata$',
                      #{clientid => <<"ceshi-test-2">>,
                        peername => "192.168.10.11:52478"}},
                  {rand_seed,
                      {#{bits => 58,jump => #Fun<rand.13.8986388>,
                         next => #Fun<rand.10.8986388>,type => exsss,
                         uniform => #Fun<rand.11.8986388>,
                         uniform_n => #Fun<rand.12.8986388>},
                       [37472060954607511|200235108313809318]}}]
    trap_exit: false
    status: running
    heap_size: 1598
    stack_size: 27
    reductions: 3999
  neighbours:

2021-10-22 01:12:50.870 [error]     supervisor: 'esockd_connection_sup - <0.1811.0>'
    errorContext: connection_crashed
    reason: {timeout,
                {gen_server,call,
                    [emqx_shared_sub,
                     {subscribe,<<"ceshi">>,
                         <<"$SYS/brokers/+/clients/+/disconnected">>,
                         <0.21252.52>}]}}
    offender: [{pid,<0.21252.52>},
               {name,connection},
               {mfargs,
                   {emqx_connection,start_link,
                       [[{deflate_options,[]},
                         {max_conn_rate,1000},
                         {active_n,100},
                         {zone,external}]]}}]

问题是两方面:
1 消费端使用了共享订阅,共享订阅性涉及到负载均衡的能力,所以性能是略低于普通订阅的。
2 你的消息是 QOS2 级别的,消费端的消费能力不足,已经发生了堆积,消息队列满了,导致掉线,重连的时候Qos2消息仍旧没有消费掉(估计已经丢失了,消息队列可能清不掉了),导致的重连失败。
解决:
不要使用 QOS2 消息

好的 谢谢,好像增加消费端也可解决,单台大概能消费多大并发量的消息(这个好像10k的不行)?是否可以调优一些参数解决(我已经调优部分参数了)?

消费端的消费能力要提高,并且QOS2的消息还要依赖设备的能力。EMQ X 是没有办法干预的。