数据量较大时消费端会掉线并且重连emq会失败

环境信息

  • EMQ X 版本:4.2
  • 操作系统及版本:centos 7.9
  • 其他

问题描述

1600个生产者以每秒10k数据速度发送数据,消费(一个)过几个小时后会掉线,并且重连会失败

配置文件及日志

2021-10-22 01:12:45.018 [warning] <<"ceshi-test-2">>@192.168.10.11:36210 [Session] Dropped msg due to mqueue is full: Message(Id= Îßþ>#ÆÿK( Ïl^, QoS=2, Topic=eap/test6TCQ1BQN/2P5297281Y/property/post, From=<<"thing-test6TCQ1BQN">>, Flags=[], Headers=#{peerhost => {192,168,10,11},
  properties => #{},proto_ver => 5,protocol => mqtt,
  username => <<"test6TCQ1BQN">>})
2021-10-22 01:12:45.019 [warning] <<"ceshi-test-2">>@192.168.10.11:36210 [Session] Dropped msg due to mqueue is full: Message(Id= Îßþ?Šþ- SnÈ, QoS=2, Topic=eap/testN7X1Q5XY/PJ7E48NRJ9/property/post, From=<<"thing-testN7X1Q5XY">>, Flags=[], Headers=#{peerhost => {192,168,10,11},
  properties => #{},proto_ver => 5,protocol => mqtt,
  username => <<"testN7X1Q5XY">>})
2021-10-22 01:12:45.019 [warning] <<"ceshi-test-2">>@192.168.10.11:36210 [Session] Dropped msg due to mqueue is full: Message(Id= Îßþ?‹Œ- M
n, QoS=2, Topic=eap/testRS16C69I/XT8V4M59NF/property/post, From=<<"thing-testRS16C69I">>, Flags=[], Headers=#{peerhost => {192,168,10,11},
  properties => #{},proto_ver => 5,protocol => mqtt,
  username => <<"testRS16C69I">>})
2021-10-22 01:12:45.568 [warning] Received gun_down with closed
2021-10-22 01:12:46.676 [warning] Received gun_down with closed
2021-10-22 01:12:46.719 [warning] Received gun_down with closed
2021-10-22 01:12:46.943 [warning] Received gun_down with closed
2021-10-22 01:12:47.518 [warning] Received gun_down with closed
2021-10-22 01:12:47.540 [warning] Received gun_down with closed
2021-10-22 01:12:47.573 [warning] Received gun_down with closed
2021-10-22 01:12:47.635 [warning] Received gun_down with closed
2021-10-22 01:12:47.781 [warning] Received gun_down with closed
2021-10-22 01:12:49.837 [warning] Received gun_down with closed
2021-10-22 01:12:50.022 [warning] Received gun_down with closed
2021-10-22 01:12:50.489 [warning] Received gun_down with closed
2021-10-22 01:12:50.527 [warning] Received gun_down with closed
2021-10-22 01:12:50.576 [warning] Received gun_down with closed
2021-10-22 01:12:50.759 [warning] Received gun_down with closed
2021-10-22 01:12:50.870 [error] <<"ceshi-test-2">>@192.168.10.11:52478   crasher:
    initial call: emqx_connection:init/4
    pid: <0.21252.52>
    registered_name: []
    exception exit: {timeout,
                        {gen_server,call,
                            [emqx_shared_sub,
                             {subscribe,<<"ceshi">>,
                                 <<"$SYS/brokers/+/clients/+/disconnected">>,
                                 <0.21252.52>}]}}
      in function  emqx_connection:terminate/2 (emqx_connection.erl, line 430)
    ancestors: [<0.1811.0>,<0.1810.0>,esockd_sup,<0.1398.0>]
    message_queue_len: 0
    messages: []
    links: [<0.1811.0>]
    dictionary: [{acl_cache_size,1},
                  {acl_keys_q,
                      {[{subscribe,
                            <<"$SYS/brokers/+/clients/+/disconnected">>}],
                       []}},
                  {send_pkt,1},
                  {guid,{1634836365593211,37968383333124,0}},
                  {incoming_bytes,107},
                  {recv_pkt,2},
                  {{subscribe,<<"$SYS/brokers/+/clients/+/disconnected">>},
                   {allow,1634836365869}},
                  {outgoing_bytes,21},
                  {'$logger_metadata$',
                      #{clientid => <<"ceshi-test-2">>,
                        peername => "192.168.10.11:52478"}},
                  {rand_seed,
                      {#{bits => 58,jump => #Fun<rand.13.8986388>,
                         next => #Fun<rand.10.8986388>,type => exsss,
                         uniform => #Fun<rand.11.8986388>,
                         uniform_n => #Fun<rand.12.8986388>},
                       [37472060954607511|200235108313809318]}}]
    trap_exit: false
    status: running
    heap_size: 1598
    stack_size: 27
    reductions: 3999
  neighbours:

2021-10-22 01:12:50.870 [error]     supervisor: 'esockd_connection_sup - <0.1811.0>'
    errorContext: connection_crashed
    reason: {timeout,
                {gen_server,call,
                    [emqx_shared_sub,
                     {subscribe,<<"ceshi">>,
                         <<"$SYS/brokers/+/clients/+/disconnected">>,
                         <0.21252.52>}]}}
    offender: [{pid,<0.21252.52>},
               {name,connection},
               {mfargs,
                   {emqx_connection,start_link,
                       [[{deflate_options,[]},
                         {max_conn_rate,1000},
                         {active_n,100},
                         {zone,external}]]}}]

问题是两方面:
1 消费端使用了共享订阅,共享订阅性涉及到负载均衡的能力,所以性能是略低于普通订阅的。
2 你的消息是 QOS2 级别的,消费端的消费能力不足,已经发生了堆积,消息队列满了,导致掉线,重连的时候Qos2消息仍旧没有消费掉(估计已经丢失了,消息队列可能清不掉了),导致的重连失败。
解决:
不要使用 QOS2 消息

好的 谢谢,好像增加消费端也可解决,单台大概能消费多大并发量的消息(这个好像10k的不行)?是否可以调优一些参数解决(我已经调优部分参数了)?

消费端的消费能力要提高,并且QOS2的消息还要依赖设备的能力。EMQ X 是没有办法干预的。

您好,我这里遇到了相似的问题,具体的描述如下。

环境信息

  • EMQ X版本: 4.4.0
  • 操作系统及其他:ubuntu 18.04
  • 其他

问题描述

在使用qos=2的情况下,生产端在不断地发送(每两秒1次,每次数据大小约1k)数据到emq中,消费端暂时关闭十几分钟后重新订阅消费,但是无法收到任务消息。

emq不断地显示的日志是

2023-01-03T20:45:36.395838+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFA29844ACD1104000B026ED9, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:38.404562+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFA48247DCD1104000B026EDA, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:40.412354+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFA66C8DCCD1104000B026EDB, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:42.422348+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFA856F8BCD1104000B026EDC, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:44.431179+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFAA41C4BCD1104000B026EDD, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:46.440551+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFAC2BE56CD1104000B026EDE, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:48.449037+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFAE15F4ACD1104000B026EDF, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:50.456031+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFB00055ACD1104000B026EE0, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:52.463423+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFB1EB563CD1104000B026EE1, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:54.471186+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFB3D686BCD1104000B026EE2, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:56.479031+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFB5C0F98CD1104000B026EE3, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:45:58.486751+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFB7AB170CD1104000B026EE4, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:46:00.495523+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFB995C4DCD1104000B026EE5, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:46:02.502667+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFBB7FFCDCD1104000B026EE6, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

2023-01-03T20:46:04.510107+08:00 [warning] TT@172.17.0.1:54476 [Session] Dropped msg due to mqueue is full: Message(Id=0005F15AFBD6A322CD1104000B026EE7, QoS=2, Topic=4001, From=<<"sender">>, Flags=[], Headers=#{peerhost => {172,17,0,1}, properties => #{},proto_ver => 4,protocol => mqtt,username => undefined})

配置都是默认的,未进行修改,一些相关的配置:

zone.external.max_mqueue_len = 1000
zone.external.mqueue_priorities = none
zone.external.mqueue_default_priority = highest
zone.external.mqueue_store_qos0 = true
zone.internal.max_mqueue_len = 10000
zone.internal.mqueue_store_qos0 = true

这里比较奇怪的是消费端重新订阅后无法获取到缓存的消息,希望您能帮忙解答下!

解决了吗