环境信息
- EMQX 版本:
- 操作系统及版本:
- 其他
问题描述
配置文件及日志
就是比如我想下发指令给一个设备,但是又想要实时(几秒内)接收到设备的返回响应。想问下EMQX有类似阿里平台的RRPC这种功能或者说有更好的实现方式呢?
RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客户机/服务器模式,用户不需要了解底层技术协议,即可远程请求服务。RRPC则可以实现由服务端请求设备端并能够使设备端响应的功能
就是比如我想下发指令给一个设备,但是又想要实时(几秒内)接收到设备的返回响应。想问下EMQX有类似阿里平台的RRPC这种功能或者说有更好的实现方式呢?
RRPC:Revert-RPC。RPC(Remote Procedure Call)采用客户机/服务器模式,用户不需要了解底层技术协议,即可远程请求服务。RRPC则可以实现由服务端请求设备端并能够使设备端响应的功能
感觉很有用的一个特性。这个响应的主题和内容格式如何规范呢?
这个功能thingboard里有,emqx里不打算实现吗?
Thingboard 偏业务一些, EMQX 更偏底层一些。暂时还没有定义这类功能的计划
public BaseResponse rRpc(RRpcReq rRpcReq) {
String topic = rRpcReq.getTopic();
long reqId = SnowFlakeUtil.getFlowIdInstance().nextId();
String respTopic =topic +“?reqId=”+reqId;
try {
JSONObject messageContent = rRpcReq.getMessageContent();
if(messageContent == null){
messageContent = new JSONObject();
}
messageContent.put(“reqId”,reqId);
//请求
log.info("ownRRpc 发消息 {},{}", JSONObject.toJSONString(rRpcReq), reqId);
long startTime = System.currentTimeMillis();
MqttMessage message = new MqttMessage(messageContent.toJSONString().getBytes());
message.setQos(2);
getClient().publish(rRpcReq.getTopic(), message);
//订阅响应主题
MqttSubscription[] subscriptions = new MqttSubscription[1];
MqttSubscription mqttSubscription = new MqttSubscription(respTopic);
mqttSubscription.setQos(0);
subscriptions[0] = mqttSubscription;
getClient().subscribe(subscriptions);
//构建响应体
OwnRRPCResponseModel ownRRPCResponseModel = new OwnRRPCResponseModel();
countDownLatchMap.put(respTopic,ownRRPCResponseModel);
long timeoutLimit = 600L;
ownRRPCResponseModel.getCountDownLatch().await(timeoutLimit,TimeUnit.SECONDS);
if (System.currentTimeMillis() - startTime > TimeUnit.MILLISECONDS.convert(timeoutLimit,TimeUnit.SECONDS)) {
return new BaseResponse<>(ReturnCodeEnum.DEVICE_TIMEOUT.getCode(), ReturnCodeEnum.DEVICE_TIMEOUT.getMsg());
}
Object result = ownRRPCResponseModel.getResult();
log.info("ownRRpcResponse {},{},{}", rRpcReq.getDeviceName(), reqId, result);
return new BaseResponse<>(BaseCodeEnum.SUCCESS, result == null?"": JsonUtil.toJSONString(result));
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
return new BaseResponse<>(BaseCodeEnum.FAILURE, e.getMessage());
} finally {
countDownLatchMap.remove(respTopic);
try {
getClient().unsubscribe(respTopic);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
@Data
private static class OwnRRPCResponseModel{
private CountDownLatch countDownLatch = new CountDownLatch(1);
private Object result;
}
private static ConcurrentHashMap<String,OwnRRPCResponseModel> countDownLatchMap = new ConcurrentHashMap<>();
大量在调用时候,您这个不会阻塞吗?
这个编辑器不适合粘代码,下次放到csdn 然后放链接比较好