EMQ X有类似阿里云RRPC同步调用的功能嘛?

环境信息

  • EMQX 版本:
  • 操作系统及版本:
  • 其他

问题描述

配置文件及日志

就是比如我想下发指令给一个设备,但是又想要实时(几秒内)接收到设备的返回响应。想问下EMQX有类似阿里平台的RRPC这种功能或者说有更好的实现方式呢?
RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客户机/服务器模式,用户不需要了解底层技术协议,即可远程请求服务。RRPC则可以实现由服务端请求设备端并能够使设备端响应的功能

2 个赞

感觉很有用的一个特性。这个响应的主题和内容格式如何规范呢?

Ping @949892129

这个功能thingboard里有,emqx里不打算实现吗?

Thingboard 偏业务一些, EMQX 更偏底层一些。暂时还没有定义这类功能的计划

阿里云也有,唉,EMQ 都没计划适配;调用RRpc向设备发送请求消息并同步返回响应_阿里云物联网平台-阿里云帮助中心

public BaseResponse rRpc(RRpcReq rRpcReq) {
String topic = rRpcReq.getTopic();
long reqId = SnowFlakeUtil.getFlowIdInstance().nextId();
String respTopic =topic +“?reqId=”+reqId;
try {
JSONObject messageContent = rRpcReq.getMessageContent();
if(messageContent == null){
messageContent = new JSONObject();
}
messageContent.put(“reqId”,reqId);

        //请求
        log.info("ownRRpc 发消息 {},{}", JSONObject.toJSONString(rRpcReq), reqId);
        long startTime = System.currentTimeMillis();
        MqttMessage message = new MqttMessage(messageContent.toJSONString().getBytes());
        message.setQos(2);
        getClient().publish(rRpcReq.getTopic(), message);

        //订阅响应主题
        MqttSubscription[] subscriptions = new MqttSubscription[1];
        MqttSubscription mqttSubscription = new MqttSubscription(respTopic);
        mqttSubscription.setQos(0);
        subscriptions[0] = mqttSubscription;
        getClient().subscribe(subscriptions);
        //构建响应体
        OwnRRPCResponseModel ownRRPCResponseModel = new OwnRRPCResponseModel();
        countDownLatchMap.put(respTopic,ownRRPCResponseModel);
        long timeoutLimit = 600L;
        ownRRPCResponseModel.getCountDownLatch().await(timeoutLimit,TimeUnit.SECONDS);
        if (System.currentTimeMillis() - startTime > TimeUnit.MILLISECONDS.convert(timeoutLimit,TimeUnit.SECONDS)) {
            return new BaseResponse<>(ReturnCodeEnum.DEVICE_TIMEOUT.getCode(), ReturnCodeEnum.DEVICE_TIMEOUT.getMsg());
        }
        Object result = ownRRPCResponseModel.getResult();
        log.info("ownRRpcResponse {},{},{}", rRpcReq.getDeviceName(), reqId, result);
        return new BaseResponse<>(BaseCodeEnum.SUCCESS, result == null?"": JsonUtil.toJSONString(result));
    } catch (MqttException | InterruptedException e) {
        e.printStackTrace();
        return new BaseResponse<>(BaseCodeEnum.FAILURE, e.getMessage());
    } finally {
        countDownLatchMap.remove(respTopic);
        try {
            getClient().unsubscribe(respTopic);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

@Data
private static class OwnRRPCResponseModel{
private CountDownLatch countDownLatch = new CountDownLatch(1);
private Object result;
}
private static ConcurrentHashMap<String,OwnRRPCResponseModel> countDownLatchMap = new ConcurrentHashMap<>();

大量在调用时候,您这个不会阻塞吗?

这个编辑器不适合粘代码,下次放到csdn 然后放链接比较好