emqx消息积压

EMQX 版本

EMQX 5.6.0

EMQX 安装部署方式

通过 EMQX Operator 部署

EMQX 集群情况

3个core节点,10个replicant节点,10个节点配置为10C32G
部署配置:

apiVersion: apps.emqx.io/v2beta1
kind: EMQX
metadata:
   name: emqx-tc
   namespace: emqx
spec:
   image: emqx/emqx:5.6.0
   config: 
    data: |
      cluster {
        autoheal = on
        autoclean = 5m
      }
      listeners.tcp.default.acceptors = 256
      listeners.tcp.default.max_connections = 1024000
      listeners.tcp.default.max_conn_rate = "10000/s"
      listeners.tcp.default.messages_rate = "10000/s"
      listeners.tcp.default.bytes_rate = "20MB/s"
      listeners.tcp.default.tcp_options.backlog = 102400
      listeners.tcp.default.tcp_options.keepalive = "240,30,5"
      listeners.ssl.default.acceptors = 64
      listeners.ssl.default.max_connections = 1024000
      listeners.ssl.default.max_conn_rate = "10000/s"
      listeners.ssl.default.messages_rate = "10000/s"
      listeners.ssl.default.bytes_rate = "20MB/s"
      listeners.ssl.default.tcp_options.backlog = 102400
      listeners.ssl.default.tcp_options.keepalive = "240,30,5"
      listeners.ws.default.acceptors = 64
      listeners.ws.default.max_connections = 1024000
      listeners.ws.default.max_conn_rate = "10000/s"
      listeners.ws.default.messages_rate = "10000/s"
      listeners.ws.default.bytes_rate = "20MB/s"
      listeners.ws.default.tcp_options.backlog = 102400
      listeners.ws.default.tcp_options.keepalive = "240,30,5"
      mqtt.max_packet_size = "1MB"
      node.cookie = "xxxxxxxxxx"
      node.max_ports = 1024000
      node.dist_buffer_size = 16384
      slow_subs.enable = false
      slow_subs.threshold = "200ms"
      slow_subs.expire_interval = "300s"
      log.console.level = info
      log.file.level = info
   listenersServiceTemplate:
     spec:
       type: ClusterIP
   dashboardServiceTemplate:
     spec:
       type: ClusterIP
   coreTemplate:
     metadata:
        name: emqx-tc-core
        namespace: emqx-core
     spec:
       replicas: 3
       env:
         - name: ERL_MAX_PROCESSES
           value: "2097152"
         - name: ERL_MAX_PORTS
           value: "1048576"
       resources:
        requests:
          cpu: 8000m
          memory: 16Gi
   replicantTemplate:
     metadata:
       name: emqx-tc-rep
       namespace: emqx-rep
     spec:
       replicas: 10
       env:
         - name: ERL_MAX_PROCESSES
           value: "2097152"
         - name: ERL_MAX_PORTS
           value: "1048576"
       resources:
        requests:
          cpu: 10000m
          memory: 32Gi

在 EMQX 中启用的功能

启用了prometheus的pushgateway,增加了插件,在publish事件时在消息的user property中插入一个时间戳;
启用了SSL证书校验端口。

测试场景

使用20台压力机,建立了100w个连接,每秒10w qps向emqx pub数据,外部服务sub数据后,再pub数据到emqx,在emqx消息的流入和流出总的是20w/s

具体问题

当消息达到20w/s后,消息会被挤压,服务统计的很多消息在emqx延迟达到了2s多


  1. 做压测建议就不要开启使用速率限制了
  2. 日志建议只开启 file, 且 log level 在 warning 以上
  3. 麻烦提供下 pub/sub 的模型和 qos 的具体信息

有更详细的监控数据吗?另外2秒的延时是怎么统计出来的

pub/sub使用的是paho.mqttv5 版本:1.2.5
qos都是1
客户端启动的参数:
cleanStart=true、maximumPacketSize=1000000L、automaticReconnecttrue
pub数据都在200b,qos是1,userproperty设置了四个值
sub数据qos是1

可以看下pod的监控,延迟时间是在sub数据的服务统计的,通过在emqx的插件在emqx接收到消息后写入时间戳,sub服务拿到数据后计算差值

另外问下,如果emqx的replicant其中一个有链接的pod被下掉了,那么该pod的连接会被漂移到其它pod节点吗?测试看连接在客户端会被关闭,operator没有将连接转移到其它pod

server端不会自动漂移到其它的pod,依赖于客户端的设置,客户的如果启用了自动重连,那么重连的时候会连接到其它的节点

从EMQX dashboard来看 incoming 和 outgoing 的 TPS 基本基本相同,所以应该不存在消息堆积的情况,对于消息延时2秒的问题,可能的影响因素包括:

  1. Pub 客户端 → EMQX Server → Sub 服务端 整个链路的网络延时和带宽限制。
  2. 延时统计的误差,如果Pub 客户端和Sub 服务端在不同的机器上,那么两台机器之间是否完全同步,如果存在差异的话会造成延时统计的误差,可以试一下在同一个节点上pub和sub。
  3. 服务端接收消息慢,可以试一下完全使用MQTT压测客户端来Pub和Sub,看看这种场景下消息的延时是多少。

好的,现在emqx副本数量是10个,总共100C,如果我调成5个副本,每个副本20C,这样是否可以提升emqx的性能?我怀疑是Emqx集群内部节点过多,节点运行不均衡,有3个节点资源耗费不到40%,有7个节点cpu耗费了80%,消息路由耗时较长?

3个Core节点的CPU使用情况如何?

core节点的cpu最高被打到了60%

那瓶颈不在core节点上,延时大估计和网络延时或者统计方法有关系

我是在消息出发on_message_publish事件时插入一个时间戳(emqbts),客户端接收到该消息后拿到消息戳,做差值来统计总的耗时

on_message_publish(Message, _Env) ->
    TimestampMilliseconds = os:system_time(millisecond),
    Headers = Message#message.headers,
    NewHeaders = update_user_properties(Headers, TimestampMilliseconds, <<"emqbts">>),
    NewMessage = Message#message{headers = NewHeaders},
[emqx_message:to_map(NewMessage), _Env]),
    {ok, NewMessage}.

另外再和您确认下,on_message_delivered(_ClientInfo = #{clientid := ClientId}, Message, _Env) 该事件触发是在当消息被放入底层socket时触发规则,这个投递到网络层是指emqx消息发出后触发的事件吧

另外我们的架构是这样的:

  1. 对外提供的nginx集群
    2.在nginx内配置emqx的所有replicant节点,使用反向代理,轮训进行负载
    3.客户端和nginx建立websocket长链接,nginx在和emqx建立长链接
    4.请求先经过nginx,反向代理将消息发送到replicant节点上