paho java

环境信息

  • EMQX 版本:5.0
  • 操作系统及版本:windows
  • 其他

问题描述

paho java 异步客户端MqttAsyncClient 执行订阅消息时,提示“客户机未连接”。但是publish是可以成功的,这是什么原因?代码如下:执行到subscribe()方法就提示报错。

配置文件及日志

@Component
public class MqttAsyncPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttAsyncPushClient.class);

    @Autowired
    private PushCallback pushCallback;

    private static MqttAsyncClient client;

    private static MqttAsyncClient getClient() {
        return client;
    }

    private static void setClient(MqttAsyncClient client) {
        MqttAsyncPushClient.client = client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param username  用户名
     * @param password  密码
     * @param timeout   超时时间
     * @param keepalive 保留数
     */
    public void connect(String host, String username, String password, int timeout, int keepalive) {
        MqttAsyncClient client;
        try {
            client = new MqttAsyncClient(host, UUID.randomUUID().toString(), new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            options.setSocketFactory(SslUtil.getSocketFactory("D:\\soft\\emqx-5.0.8-windows-amd64\\etc\\certs\\ca.pem"));
            MqttAsyncPushClient.setClient(client);
            client.setCallback(pushCallback);
            client.connect(options);
        } catch (MqttException e) {
            logger.error("-------------------客户端连接异常----------------");
            logger.error("reason: " + e.getReasonCode());
            logger.error("msg: " + e.getMessage());
            e.printStackTrace();
        } catch (Exception ee) {
            logger.error("-------------------客户端连接异常----------------");
            logger.error("msg: " + ee.getMessage());
            ee.printStackTrace();
        }
    }

    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public boolean publish(int qos, boolean retained, String topic, String pushMessage) {
        logger.info("mqttAsync"+pushMessage);
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());

        IMqttDeliveryToken token;
        try {
            token = MqttAsyncPushClient.getClient().publish(topic, message);
            logger.info("结果-------------------"+token.isComplete());
            token.waitForCompletion();
            logger.info("结果-------------------"+token.isComplete());
            return true;
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
            return false;
        } catch (MqttException e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 订阅某个主题
     *
     * @param topic 主题
     * @param qos   连接方式
     */
    public void subscribe(String topic, int qos) {
        logger.info("mqttAsync开始订阅主题" + topic);
        try {
            MqttAsyncPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }



-------------------------------------------------------------------------------------
这是config代码,项目启动时候,@bean方法里,mqttAsyncPushClient.subscribe()这里报的错

@Configuration
@Setter
@Getter
public class MqttConfig {
//    @Autowired
//    private MqttPushClient mqttPushClient;

    @Autowired
    private MqttAsyncPushClient mqttAsyncPushClient;

    @Value("${mqtt.username}")
    private String username;

    @Value("${mqtt.password")
    private String password;

    @Value("${mqtt.serverURIs}")
    private String hostUrl;

    @Value("${mqtt.clientId}")
    private String clientId;

    @Value("${mqtt.topic}")
    private String defaultTopic;

    @Value("${mqtt.qos}")
    private int qos;

    @Value("${mqtt.enabled}")
    private boolean enabled;

    @Value("${mqtt.keepalive}")
    private int keepalive;

    @Value("${mqtt.timeout}")
    private int timeout;

    private static final Logger logger = LoggerFactory.getLogger(MqttConfig.class);


    @Bean
    public MqttAsyncPushClient getMqttAsyncPushClient() {
        if(enabled == true){
            String mqtt_topic[] = defaultTopic.split(",");
            mqttAsyncPushClient.connect(hostUrl, username, password, timeout, keepalive);//连接
            for(int i=0; i<mqtt_topic.length; i++){
                mqttAsyncPushClient.subscribe(mqtt_topic[i], 0);//订阅主题
            }
        }
        return mqttAsyncPushClient;
    }

呼唤java大佬帮你解答 @DDDHuang @bagpipes


你这里是局部变量。不需要再创建一个了

2 个赞

同时你再看看发送订阅的时候客户端确定服务端正常连上

应该跟这个没关系,java允许局部变量和全局变量重名,现在的问题是,publish是可以的,但是订阅就提示“客户机未连接”

发布正常,消费就报错错

具体日志呢,同时你debug 跟踪一下。并且emqx 测日志追踪一下。这个sdk网上一堆的demo,我自己也在用这个都是正常的,基本肯定你代码哪里搞错了

帮忙看下我的那个config类,编辑了帖子

这个看不出来,你这边肯定是创建了2个对象,一个对象连接上可以发布的,一个没连接就发起订阅,你可以判断连接成功再发起订阅。最好的方式就是debug调试跟踪一下

1 个赞

总感觉你的config 和client 类怪怪的。你可以参考这里的写法:Springboot实现MQTT通信_爱打羽球的码猿的博客-CSDN博客_mqtt springboot

我感觉这个错误是正常的,用的异步客户端,connect可能还没创建完成,就去执行了下面的subsctrib,我在connect()休眠2s,一切正常了。说明就是异步的问题吧。

订阅可以放到callback中,连接成功后发起订阅