想问,怎么样才能让消息不丢失,提高消息的流出。感谢各位!
下面是我的配置: @Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] {broker});
options.setUserName(username); // 可选:用户名
options.setPassword(password.toCharArray()); // 可选:密码
options.setKeepAliveInterval(keepalive);
options.setConnectionTimeout(1000); // 设置连接超时为30秒
options.setCleanSession(false); // 设置为持久会话
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new QueueChannel();
}
这里既使屏蔽了逻辑功能,也只能到1000多的流出,
@Bean
@ServiceActivator(inputChannel = “mqttInputChannel”)
public MessageHandler handler() {
return message → {
System.out.println(message.getPayload());
// mqttMessageManage.handler(message);
};
}
不懂java.
不过可以发一下EMQ的日志,看看有什么报错没有 分析一下。
supervisor: {esockd_connection_sup,<0.12922.0>}, errorContext: connection_shutdown, reason: #{max => 1000,reason => mailbox_overflow,value => 2705}, offender: [{pid,<0.12922.0>},{name,connection},{mfargs,{emqx_connection,start_link,[#{listener => {tcp,default},limiter => #{connection => #{initial => 0,burst => 0,rate => infinity}},enable_authn => true,zone => default}]}}] 有这个报错 好像是消息队列满了,我需要怎么才能增加消息队列最大数
这个日志表示的消费端太慢了。建议使用 共享订阅,增加多个端来一起消费。
不推荐只是简单的增加消息队列。
在 dashboard 上的 管理 |> MQTT 配置 |> 会话 |> 最大消息队列长度
可以调节。
这个加大了 会让客户端直接断开,估计是内存又不够了
<0.3607.0> on node ‘emqx@127.0.0.1’, Context: maximum heap size reached, Max Heap Size: 6291456, Total Heap Size: 6296338, Kill: true, Error Logger: true, Message Queue Len: 0, GC Info: [{old_heap_block_size,2487399},{heap_block_size,3800194},{mbuf_size,8860},{recent_size,1191170},{stack_size,91},{old_heap_size,0},{heap_size,1727246},{bin_vheap_size,311334},{bin_vheap_block_size,618085},{bin_old_vheap_size,0},{bin_old_vheap_block_size,46422}]
我该了这个到3000 rrorContext: connection_shutdown, reason: #{max => 1000,reason => mailbox_overflow,value => 8922}, offender: [{pid,<0.4854.0>},{name,connection},{mfargs,{emqx_connection,start_link,[#{listener => {tcp,default},limiter => #{connection => #{initial => 0,burst => 0,rate => infinity}},zone => default,enable_authn => true}]}}] 这边的报错日志还是1000 是通过这个改的么
是要使用共享订阅的,设计是多个实例订阅,3个实例也没发处理完每s8000的并发,我们希望并发数能达到10w的。或者说你的意思是在一个实例里面多个线程订阅么,如果是10w并发有建议多少的客户端共享订阅么