并发发送100w条数据只收到20w左右,其他消息丢失,消息流出只有1000多,是否哪里配置有问题

想问,怎么样才能让消息不丢失,提高消息的流出。感谢各位!
1733222452032
下面是我的配置: @Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();

    MqttConnectOptions options = new MqttConnectOptions();
    options.setServerURIs(new String[] {broker});
    options.setUserName(username);  // 可选:用户名
    options.setPassword(password.toCharArray());  // 可选:密码
    options.setKeepAliveInterval(keepalive);
    options.setConnectionTimeout(1000);  // 设置连接超时为30秒
    options.setCleanSession(false);  // 设置为持久会话
    factory.setConnectionOptions(options);

    return factory;
}   

@Bean
public MessageChannel mqttInputChannel() {
return new QueueChannel();
}

这里既使屏蔽了逻辑功能,也只能到1000多的流出,
@Bean
@ServiceActivator(inputChannel = “mqttInputChannel”)
public MessageHandler handler() {
return message → {
System.out.println(message.getPayload());
// mqttMessageManage.handler(message);
};
}

不懂java.
不过可以发一下EMQ的日志,看看有什么报错没有 分析一下。

supervisor: {esockd_connection_sup,<0.12922.0>}, errorContext: connection_shutdown, reason: #{max => 1000,reason => mailbox_overflow,value => 2705}, offender: [{pid,<0.12922.0>},{name,connection},{mfargs,{emqx_connection,start_link,[#{listener => {tcp,default},limiter => #{connection => #{initial => 0,burst => 0,rate => infinity}},enable_authn => true,zone => default}]}}] 有这个报错 好像是消息队列满了,我需要怎么才能增加消息队列最大数

这个日志表示的消费端太慢了。建议使用 共享订阅,增加多个端来一起消费。

不推荐只是简单的增加消息队列。

在 dashboard 上的 管理 |> MQTT 配置 |> 会话 |> 最大消息队列长度
可以调节。

这个加大了 会让客户端直接断开,估计是内存又不够了

<0.3607.0> on node ‘emqx@127.0.0.1’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6296338, Kill: true, Error Logger: true, Message Queue Len: 0, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,8860},{recent_size,1191170},{stack_size,91},{old_heap_size,0},{heap_size,1727246},{bin_vheap_size,311334},{bin_vheap_block_size,618085},{bin_old_vheap_size,0},{bin_old_vheap_block_size,46422}]

image
我该了这个到3000 rrorContext: connection_shutdown, reason: #{max => 1000,reason => mailbox_overflow,value => 8922}, offender: [{pid,<0.4854.0>},{name,connection},{mfargs,{emqx_connection,start_link,[#{listener => {tcp,default},limiter => #{connection => #{initial => 0,burst => 0,rate => infinity}},zone => default,enable_authn => true}]}}] 这边的报错日志还是1000 是通过这个改的么

改了后要客户端重新连接。推荐你使用 共享订阅

是要使用共享订阅的,设计是多个实例订阅,3个实例也没发处理完每s8000的并发,我们希望并发数能达到10w的。或者说你的意思是在一个实例里面多个线程订阅么,如果是10w并发有建议多少的客户端共享订阅么

建议结合自己的场景测试。