EMQX开源版MQTT消息大小限制

EMQX 版本

EMQX 安装部署方式

yum 安装

单点部署

客户端1 f发布数据 mosquitto_pub -h x.x.x.x -p x -d -t test -f iiec104srv.log iiec104srv.log文件大小1.2MB
客户端2 订阅数数据 mosquitto_sub -h x.x.x.x -p x -d -t test

broker 使用mosquitto时,sub端可以收到数据,使用emqx时sub不到数据。其中 消息长度限制修改为10MB

Maximum MQTT packet size allowed

max_packet_size = 10MB

完整的配置文件:

NOTE:

This config file overrides data/configs/cluster.hocon,

and is merged with environment variables which start with ‘EMQX_’ prefix.

Config changes made from EMQX dashboard UI, management HTTP API, or CLI

are stored in data/configs/cluster.hocon.

To avoid confusion, please do not store the same configs in both files.

See https://www.emqx.io/docs/en/v5.0/configuration/configuration.html for more details.

Configuration full example can be found in etc/examples

node {
name = “emqx@127.0.0.1
cookie = “emqxsecretcookie”
data_dir = “/var/lib/emqx”
}

cluster {
name = emqxcl
discovery_strategy = manual
}

dashboard {
listeners.http {
bind = 18083
}
}

listeners.tcp.default {
## Port or Address to listen on, 0 means disable
bind = “0.0.0.0:3389” ## or with an IP e.g. “127.0.0.1:1883”

## Enable the Proxy Protocol V1/2 if the EMQX cluster is deployed behind HAProxy or Nginx
proxy_protocol = false

## Timeout for proxy protocol
proxy_protocol_timeout = 8

## When publishing or subscribing, prefix all topics with a mountpoint string
mountpoint = "mqtt" ## Do not set this unless you know what is it for

## Client authentication
## Type:
##   - true :: enable
##   - false :: disable
##   - quick_deny_anonymous :: denied immediately without if username is not provided
enable_authn = false

## The access control rules for this listener
## Type: See: https://github.com/emqtt/esockd#allowdeny
access_rules = ["allow all"]

## Socket acceptor pool size for TCP protocols
acceptors = 16

## Maximum number of simultaneous connections
## Type: infinity | Integer
max_connections = infinity

tcp_options {
    ## TCP backlog defines the maximum length that the queue of pending connections can grow to
    backlog = 1024

    ## The TCP send timeout for the connections
    send_timeout = 15s

    ## Timeout for proxy protocol
    send_timeout_close = true

    ## The TCP receive buffer (OS kernel) for the connections
    recbuf = 2KB

    ## The TCP send buffer (OS kernel) for the connections
    sndbuf = 4KB

    ## The size of the user-space buffer used by the driver
    buffer = 4KB

    ## The socket is set to a busy state when the amount of data queued internally by the VM socket implementation reaches this limit
    high_watermark = 1MB

    ## The TCP_NODELAY flag for the connections
    nodelay = true

    ## The SO_REUSEADDR flag for the connections
    reuseaddr = true

    ## Enable TCP keepalive for MQTT connections over TCP or SSL
    ## Type: three comma separated numbers in the format of 'Idle,Interval,Probes'
    ##    - Idle: The number of seconds a connection needs to be idle before the server begins to send out keep-alive probes (Linux default 7200).
    ##    - Interval: The number of seconds between TCP keep-alive probes (Linux default 75).
    ##    - Probes: The maximum number of TCP keep-alive probes to send before giving up and killing the connection if no response is obtained from the other end (Linux default 9).
    ## For example "240,30,5" means: EMQX should start sending TCP keepalive probes after the connection is in idle for 240 seconds, and the probes are sent every 30 seconds until a response is received from the MQTT client, if it misses 5 consecutive responses, EMQX should close the connection
    keepalive = "none"
}

}

mqtt {
## After the TCP connection is established,
## if the MQTT CONNECT packet from the client is not received within the time specified by idle_timeout, the connection will be disconnected
## Type:
## - infinity :: Never disconnect
## - Time Duration :: The idle time
idle_timeout = 1800s

## Maximum MQTT packet size allowed
max_packet_size = 10MB

## Maximum allowed length of MQTT Client ID
## Type: Rnage from 23 to 65535
max_clientid_len = 65535

## Maximum topic levels allowed
## Type: Range from 1 to 65535
max_topic_levels = 128

## Maximum QoS allowed
max_qos_allowed = 2

## Maximum topic alias, 0 means no topic alias supported
## Type: Range from 0 to 65535
max_topic_alias = 65535

## Whether to enable support for MQTT retained message
retain_available = true

## Whether to enable support for MQTT wildcard subscription
wildcard_subscription = true

## Whether to enable support for MQTT shared subscription
shared_subscription = true

## Whether to enable support for MQTT exclusive subscription
exclusive_subscription = false

## Ignore loop delivery of messages for MQTT v3.1.1/v3.1.0, similar to No Local subscription option in MQTT 5.0
ignore_loop_deliver = false

## Parse MQTT messages in strict mode.
## When set to true, invalid utf8 strings in for example client ID, topic name, etc. will cause the client to be disconnected
strict_mode = false

## Specify the response information returned to the client
response_information  = ""

## The keep alive that EMQX requires the client to use
## Type:
##   - disabled :: the keep alive specified by the client will be used
##   - Integer :: Keepalive time, only applicable to clients using MQTT 5.0 protocol
server_keepalive = disabled

## Keep-Alive Timeout = Keep-Alive interval × Keep-Alive Multiplier
keepalive_multiplier = 1.5

## Maximum number of subscriptions allowed per client
## Type: infinity | Integer
max_subscriptions = infinity

## Force upgrade of QoS level according to subscription
upgrade_qos = false

## Maximum number of QoS 1 and QoS 2 messages that are allowed to be delivered simultaneously before completing the acknowledgment
## Type: Range from 1 to 65535
max_inflight = 32

## Retry interval for QoS 1/2 message delivering
retry_interval = 30s

## For each publisher session, the maximum number of outstanding QoS 2 messages pending on the client to send PUBREL
## Type: infinity | Integer
max_awaiting_rel = 100

## For client to broker QoS 2 message, the time limit for the broker to wait before the PUBREL message is received
await_rel_timeout = 300s

## Specifies how long the session will expire after the connection is disconnected, only for non-MQTT 5.0 connections
session_expiry_interval = 2h

## Maximum queue length. Enqueued messages when persistent client disconnected, or inflight window is full
## Type: infinity | Integer
max_mqueue_len = 1000

## Specifies whether to store QoS 0 messages in the message queue while the connection is down but the session remains
mqueue_store_qos0 = true

## Whether to user Client ID as Username
use_username_as_clientid = false

## Use the CN, DN field in the peer certificate or the entire certificate content as Username
## Type:
##   - disabled
##   - cn :: CN field of the certificate
##   - dn :: DN field of the certificate
##   - crt :: the content of the DER or PEM certificate
##   - pem :: PEM format content converted from DER certificate content
##   - md5 :: the MD5 value of the content of the DER or PEM certificate
peer_cert_as_username = disabled

## Use the CN, DN field in the peer certificate or the entire certificate content as Client ID
## Type: See the above
peer_cert_as_clientid = disabled

## Dispatch strategy for shared subscription
## Type:
##   - random :: dispatch the message to a random selected subscriber
##   - round_robin :: select the subscribers in a round-robin manner
##   - round_robin_per_group :: select the subscribers in round-robin fashion within each shared subscriber group
##   - local :: select random local subscriber otherwise select random cluster-wide
##   - sticky :: always use the last selected subscriber to dispatch, until the subscriber disconnects.
##   - hash_clientid :: select the subscribers by hashing the `clientIds`
##   - hash_topic :: select the subscribers by hashing the source topic"""
shared_subscription_strategy = round_robin

}

log.file {
## Enable file log handler
enable = true

## Log level
## Type: debug | info | notice | warning | error | critical | alert | emergency
level = debug

## Log formatter, text for free text, and json for more structured logging
## Type: text | json
formatter = text

## Time offset for formatting the timestamp
## Type:
##   - system :: local system time
##   - utc :: UTC time
##   - +-[hh]:[mm]: user specified time offset, such as "-02:00" or "+00:00" Defaults to: system
time_offset = system

## Maximum number of log files
## Type: Range from 1 to 128
rotation_count = 10

## This parameter controls log file rotation
## Type:
##  - infinity :: the log file will grow indefinitely
##  - ByteSize :: the log file will be rotated once it reaches this value in bytes
rotation_size = 50MB

}

2023-08-01T15:27:31.932642+08:00 [info] msg: authorization_permission_allowed, mfa: emqx_authz:log_allowed/1, line: 436, peername: 221.178.126.174:39653, clientid: MzExOTExMzUyODI0ODIzNzUwNjk2MDYxNDM2OTU3MTYzNTC, topic: test, ipaddr: {221,178,126,174}, source: file, username: undefined
2023-08-01T15:27:31.980535+08:00 [debug] msg: emqx_cm_clean_down, mfa: emqx_cm:clean_down/1, line: 714, client_id: <<“MzExOTExMzUyODI0ODIzNzUwNjk2MDYxNDM2OTU3MTYzNTC”>>
2023-08-01T15:27:31.980190+08:00 [error] Process: <0.3047.0> on node ‘emqx@127.0.0.1’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6719020, Kill: true, Error Logger: true, Message Queue Len: 0, GC Info: [{old_heap_block_size,2487399},{heap_block_size,4974798},{mbuf_size,0},{recent_size,552545},{stack_size,57},{old_heap_size,0},{heap_size,1744222},{bin_vheap_size,137160},{bin_vheap_block_size,128689},{bin_old_vheap_size,0},{bin_old_vheap_block_size,46422}]
2023-08-01T15:27:36.147162+08:00 [debug] msg: raw_bin_received, mfa: emqx_connection:when_bytes_in/3, line: 779, peername: 42.192.152.188:42146, clientid: mosq-05ckjgZMEhwuYP37Px, bin: C000, size: 2, type: hex
2023-08-01T15:27:36.147523+08:00 [debug]


通过emqx ctl conf show mqtt 查看到设置的参数是 max_packet_size = 10MB

大文件到broker时broker出现异常信息
EMQX 5.1.3 is running now!
Restricted Eshell V13.2.2 (abort with ^G)
v5.1.3(emqx@127.0.0.1)1> 2023-08-01T18:24:08.371478+08:00 [error] Process: <0.3035.0> on node ‘emqx@127.0.0.1’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 8317160, Kill: true, Error Logger: true, Message Queue Len: 0, GC Info: [{old_heap_block_size,2984878},{heap_block_size,5057711},{mbuf_size,565902},{recent_size,1149037},{stack_size,56},{old_heap_size,0},{heap_size,1781502},{bin_vheap_size,141834},{bin_vheap_block_size,159072},{bin_old_vheap_size,0},{bin_old_vheap_block_size,75110}]

我们已经尝试在这个 PR 中修复 https://github.com/emqx/emqx/pull/11279

看到最新的5.1.4中还存在这个bug
v5.1.4(emqx@127.0.0.1)1> 2023-08-06T16:40:43.122400+08:00 [error] Process: <0.3105.0> on node ‘emqx@127.0.0.1’, Context: maximum heap size reached, Max Heap Size: 12582912, Total Heap Size: 12609395, Kill: true, Error Logger: true, Message Queue Len: 0, GC Info: [{old_heap_block_size,4298223},{heap_block_size,9456090},{mbuf_size,0},{recent_size,3153305},{stack_size,1144918},{old_heap_size,0},{heap_size,3153305},{bin_vheap_size,391673},{bin_vheap_block_size,648018},{bin_old_vheap_size,0},{bin_old_vheap_block_size,159073}]

这个不是 BUG,是 EMQX 内部会有监控。发现如果有进程占用内存过大就会把它杀掉防止整个系统的 OOM

你好,查看看到最新5.15版本已经修复了11279问题;我通过yum更新到最新版本后发现pub 2MB的数据时 pub端还是会被踢掉,还需要修改些配置参数?
谢谢!

这似乎是另外一个 bug, 我们这边会尽快处理