java 钩子 exhook 部署后日志有打印,客户端接收不到信息
建议先将日志等级设置为 debug 然后再进行排查
package io.emqx.exhook.server;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import io.emqx.exhook.EmptySuccess;
import io.emqx.exhook.Message;
import io.emqx.exhook.ValuedResponse;
import io.emqx.exhook.enume.EnumeIM;
import io.emqx.exhook.json.JsonHelper;
import io.emqx.exhook.mqtt.MqttProviderConfig;
import io.emqx.exhook.rabbitmq.RabbitMqSenderHelper;
import io.emqx.exhook.redis.RedisClientUtil;
import io.grpc.stub.StreamObserver;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
@Service(“exServer”)
public class ExServer {
private static final Logger logger = Logger.getLogger(ExServer.class.getName());
private final static String SEPARATOR = "/";
private final static String UNDER_CROSS = ".";
private final static String SEPARATORS;
static {
SEPARATORS = "\\";
}
private final static String UNDERLINE = "_";
/*public Server server;
public void start() throws IOException {
*//* The port on which the server should run *//*
int port = 9000;
server = ServerBuilder.forPort(port)
.addService(new HookProviderImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
ExServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
*//**
* Await termination on the main thread since the grpc library uses daemon threads.
*//*
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}*/
/**
* Main launches the server from the command line.
*/
/* public static void main(String args) throws IOException, InterruptedException {
final ExServer server = new ExServer();
server.start();
server.blockUntilShutdown();
}*/
@Service(“hookProviderImpl”)
public static class HookProviderImpl extends HookProviderGrpc.HookProviderImplBase {
private RedisClientUtil redisUtil;
private JsonHelper jsonHelper;
private RabbitMqSenderHelper rabbitMqSenderHelper;
private MqttProviderConfig providerClient;
public HookProviderImpl(RedisClientUtil redisUtil, JsonHelper jsonHelper, RabbitMqSenderHelper rabbitMqSenderHelper, MqttProviderConfig providerClient) {
this.redisUtil=redisUtil;
this.jsonHelper=jsonHelper;
this.rabbitMqSenderHelper=rabbitMqSenderHelper;
this.providerClient=providerClient;
}
public void DEBUG(String fn, Object req) {
System.out.printf(fn + ", request: " + req);
}
@Override
public void onProviderLoaded(io.emqx.exhook.ProviderLoadedRequest request, StreamObserver<io.emqx.exhook.LoadedResponse> responseObserver) {
/* DEBUG("onProviderLoaded", request);*/
io.emqx.exhook.HookSpec[] specs = {
io.emqx.exhook.HookSpec.newBuilder().setName("client.connect").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("client.connack").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("client.connected").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("client.disconnected").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("client.authenticate").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("client.check_acl").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("client.subscribe").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("client.unsubscribe").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("session.created").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("session.subscribed").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("session.unsubscribed").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("session.resumed").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("session.discarded").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("session.takeovered").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("session.terminated").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("message.publish").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("message.delivered").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("message.acked").build(),
io.emqx.exhook.HookSpec.newBuilder().setName("message.dropped").build()
};
io.emqx.exhook.LoadedResponse reply = io.emqx.exhook.LoadedResponse.newBuilder().addAllHooks(Arrays.asList(specs)).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onProviderUnloaded(io.emqx.exhook.ProviderUnloadedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onProviderUnloaded", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
/**
* 客户端连接
* @param request
* @param responseObserver
*/
@Override
public void onClientConnect(io.emqx.exhook.ClientConnectRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onClientConnect", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientConnack(io.emqx.exhook.ClientConnackRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onClientConnack", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
/**
* 客户端已连上
* @param request
* @param responseObserver
*/
@Override
public void onClientConnected(io.emqx.exhook.ClientConnectedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onClientConnected", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
/**
* 离线监听
* @param request
* @param responseObserver
*/
@Override
public void onClientDisconnected(io.emqx.exhook.ClientDisconnectedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onClientDisconnected", request);
/* String clientId = request.getClientinfo().getClientid();
if(clientId.contains(UNDERLINE+EnumeIM.DEVICETYPE_ROBOTICVACUUM.getKey())||clientId.contains(UNDERLINE+EnumeIM.DEVICETYPE_WINDOWCLEANMACHINE.getKey())) {
int index = clientId.lastIndexOf(UNDERLINE);//lastIndexOf()方法可返回一个指定的字符串值最后出现的位置,在一个字符串中的指定位置从后向前搜索
String deviceType = clientId.substring(index + EnumeIM.ROBOT_PARTS_EVENT_ONE_INT.getIntKey(), clientId.length());
String deviceId = clientId.substring(EnumeIM.ROBOT_PARTS_EVENT_ZERO_INT.getIntKey(), index);
String topic_key = "";
String prop_key = "";
prop_key = prop_key.concat(deviceId).concat(UNDER_CROSS).concat(deviceType).concat(UNDER_CROSS).concat("prop");
topic_key = topic_key.concat(deviceId).concat(UNDER_CROSS).concat(deviceType).concat(UNDER_CROSS).concat("topic");
String topicStr = redisUtil.getShardedJedis().get(topic_key);
String propStr = redisUtil.getShardedJedis().get(prop_key);//从缓存中获取
if (StringUtils.hasLength(propStr)) {
Map<String, Object> propMap = jsonHelper.jsonStr2Object(propStr, Map.class);
propMap.put("state", EnumeIM.offline.getKey());//修改属性state值为0
redisUtil.getShardedJedis().set(prop_key, jsonHelper.object2JsonStr(propMap));//放入缓存中
}
if (StringUtils.hasLength(topicStr)) {
String appKey = topicStr.split(SEPARATORS+UNDER_CROSS)[Integer.valueOf(EnumeIM.ROBOT_PARTS_EVENT_ZERO.getValue())];
String iotType = topicStr.split(SEPARATORS+UNDER_CROSS)[Integer.valueOf(EnumeIM.ROBOT_PARTS_EVENT_ONE.getValue())];
String acTime = topicStr.split(SEPARATORS+UNDER_CROSS)[Integer.valueOf(EnumeIM.ROBOT_PARTS_EVENT_TWO.getValue())];
String topic = SEPARATOR + appKey + SEPARATOR + iotType + SEPARATOR + deviceType + SEPARATOR + deviceId + SEPARATOR + "report";
Map<String, Object> content = assemblySendMsg(deviceId, appKey, iotType, acTime);
//发送消息监听该主题的app端修改状态
providerClient.publish(EnumeIM.QOS_ZERO.getIntKey(), true, topic, jsonHelper.object2JsonStr(content));
//设备的属性状态值state修改为0
}
}*/
EmptySuccess reply = EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientAuthenticate(io.emqx.exhook.ClientAuthenticateRequest request, StreamObserver<io.emqx.exhook.ValuedResponse> responseObserver) {
DEBUG("onClientAuthenticate", request);
io.emqx.exhook.ValuedResponse reply = io.emqx.exhook.ValuedResponse.newBuilder()
.setBoolResult(true)
.setType(io.emqx.exhook.ValuedResponse.ResponsedType.STOP_AND_RETURN)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientCheckAcl(io.emqx.exhook.ClientCheckAclRequest request, StreamObserver<io.emqx.exhook.ValuedResponse> responseObserver) {
DEBUG("onClientCheckAcl", request);
io.emqx.exhook.ValuedResponse reply = io.emqx.exhook.ValuedResponse.newBuilder()
.setBoolResult(true)
.setType(io.emqx.exhook.ValuedResponse.ResponsedType.STOP_AND_RETURN)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientSubscribe(io.emqx.exhook.ClientSubscribeRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onClientSubscribe", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onClientUnsubscribe(io.emqx.exhook.ClientUnsubscribeRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onClientUnsubscribe", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
/**
* session创建
* @param request
* @param responseObserver
*/
@Override
public void onSessionCreated(io.emqx.exhook.SessionCreatedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onSessionCreated", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionSubscribed(io.emqx.exhook.SessionSubscribedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onSessionSubscribed", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionUnsubscribed(io.emqx.exhook.SessionUnsubscribedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onSessionUnsubscribed", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionResumed(io.emqx.exhook.SessionResumedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onSessionResumed", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionDiscarded(io.emqx.exhook.SessionDiscardedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onSessionDdiscarded", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onSessionTakeovered(io.emqx.exhook.SessionTakeoveredRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onSessionTakeovered", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
/**
* session停止监听
* @param request
* @param responseObserver
*/
@Override
public void onSessionTerminated(io.emqx.exhook.SessionTerminatedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onSessionTerminated", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onMessagePublish(io.emqx.exhook.MessagePublishRequest request, StreamObserver<io.emqx.exhook.ValuedResponse> responseObserver) {
DEBUG("onMessageDelivered", request);
logger.info("接收的消息时间:"+new Date());
ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)");
Message nmsg = Message.newBuilder()
.setId(request.getMessage().getId())
.setNode(request.getMessage().getNode())
.setFrom(request.getMessage().getFrom())
.setTopic(request.getMessage().getTopic())
.setPayload(bstr).build();
/*
String topic = request.getMessage().getTopic();
String prop_key = “”;//各个实例的key 业务系统通过redis 获取到属性
String topic_key = “”;//各个topic的key 客户端【机器】离线用到
String topic_value = “”;//各个topic的值
ByteString payload = request.getMessage().getPayload();
String payloadStr = payload.toStringUtf8();
if(topic.contains(“/report”)){//机器上报(订阅此地址的布什容可以接收到机器上报属性)
String appkey = topic.split(SEPARATOR)[EnumeIM.ROBOT_PARTS_EVENT_ONE_INT.getIntKey()];
String iotType = topic.split(SEPARATOR)[EnumeIM.ROBOT_PARTS_EVENT_TWO_INT.getIntKey()];
String deviceType = topic.split(SEPARATOR)[EnumeIM.ROBOT_PARTS_EVENT_THREE_INT.getIntKey()];
String deviceId = topic.split(SEPARATOR)[EnumeIM.ROBOT_PARTS_EVENT_FOUR_INT.getIntKey()];
prop_key = prop_key.concat(deviceId).concat(UNDER_CROSS).concat(deviceType).concat(UNDER_CROSS).concat(“prop”);
topic_key=topic_key.concat(deviceId).concat(UNDER_CROSS).concat(deviceType).concat(UNDER_CROSS).concat(“topic”);
Map map = jsonHelper.jsonStr2Object(payloadStr,Map.class);
if(map!=null){
if(map.get(“msgType”)!=null && EnumeIM.MSG_TYPEE.getValue().equals(map.get(“msgType”))){//服务器端发送的直接丢弃
}else {
String propStr = redisUtil.getShardedJedis().get(prop_key);
Map propMap = jsonHelper.jsonStr2Object(propStr,Map.class);
long acTime = Long.valueOf((String) map.get(“acTime”));
topic_value = topic_value.concat(appkey).concat(UNDER_CROSS).concat(iotType).concat(UNDER_CROSS).concat(String.valueOf(acTime + 1));
redisUtil.getShardedJedis().set(topic_key, topic_value);
//解析属性
Map valueMap = (Map) map.get(“value”);
if (propMap!=null && propMap.get(“acTime”) != null) {//缓存有值
Long acTimeOld = (Long) propMap.get(“acTime”);//旧的时间
if (acTime > acTimeOld) {//新的数据的时间比较新重新放入内存
valueMap.put(“acTime”, acTime);//放入属性中
redisUtil.getShardedJedis().set(prop_key, jsonHelper.object2JsonStr(valueMap));//放入缓存
}
} else {
valueMap.put(“acTime”, acTime);//放入属性中
redisUtil.getShardedJedis().set(prop_key, jsonHelper.object2JsonStr(valueMap));//放入缓存
}
}
}
} else if(topic.contains(“/notice”)){//机器上报的通知直接转到springmvc项目处理
rabbitMqSenderHelper.sender(payloadStr);
}*/
ValuedResponse reply = ValuedResponse.newBuilder()
.setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
.setMessage(nmsg).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
logger.info("接收完成的时间:"+new Date());
}
@Override
public void onMessageDelivered(io.emqx.exhook.MessageDeliveredRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onMessageDelivered", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onMessageAcked(io.emqx.exhook.MessageAckedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onMessageAcked", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void onMessageDropped(io.emqx.exhook.MessageDroppedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
DEBUG("onMessageDropped", request);
io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
private static Map<String, Object> assemblySendMsg(String deviceId, String appKey, String iotType, String acTime) {
Map<String, Object> content = new HashMap<String, Object>();
content.put("seq", "robot0");
content.put("cmd", "request");
content.put("acTime", acTime);
content.put("version", EnumeIM.offline.getKey());
content.put("msgType", EnumeIM.MSG_TYPEE.getValue());
Map<String, Object> value = Maps.newHashMap();
value.put("state", EnumeIM.offline.getKey());
Map<String, Object> control = Maps.newHashMap();
control.put("appKey", appKey);
control.put("targetId", deviceId);
control.put("targetType", EnumeIM.PUSH_TARGETTYPE.getValue());
control.put("broadcast", EnumeIM.PUSH_BROADCAST.getValue());
control.put("iotType", iotType);
content.put("value", value);
content.put("control", control);
return content;
}
}
package io.emqx.exhook.server;
import io.emqx.exhook.json.JsonHelper;
import io.emqx.exhook.mqtt.MqttProviderConfig;
import io.emqx.exhook.rabbitmq.RabbitMqSenderHelper;
import io.emqx.exhook.redis.RedisClientUtil;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@Service(“myServer”)
public class MyServer implements ApplicationListener {
private static final Logger logger = Logger.getLogger(ExServer.class.getName());
public Server server;
@Autowired
public RedisClientUtil redisUtil;
@Autowired
public JsonHelper jsonHelper;
@Autowired
public RabbitMqSenderHelper rabbitMqSenderHelper;
@Autowired
public MqttProviderConfig providerClient;
public void start() throws IOException {
/* The port on which the server should run */
int port = 9000;
server = ServerBuilder.forPort(port)
.addService(new ExServer.HookProviderImpl(redisUtil,jsonHelper,rabbitMqSenderHelper,providerClient))
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
try {
MyServer.this.stop();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
System.err.println("*** server shut down");
}
});
}
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
try {
start();
} catch (IOException e) {
e.printStackTrace();
}
try {
blockUntilShutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}