emqx 免费版

java 钩子 exhook 部署后日志有打印,客户端接收不到信息

建议先将日志等级设置为 debug 然后再进行排查

package io.emqx.exhook.server;

import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import io.emqx.exhook.EmptySuccess;
import io.emqx.exhook.Message;
import io.emqx.exhook.ValuedResponse;
import io.emqx.exhook.enume.EnumeIM;
import io.emqx.exhook.json.JsonHelper;
import io.emqx.exhook.mqtt.MqttProviderConfig;
import io.emqx.exhook.rabbitmq.RabbitMqSenderHelper;
import io.emqx.exhook.redis.RedisClientUtil;
import io.grpc.stub.StreamObserver;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

@Service(“exServer”)
public class ExServer {
private static final Logger logger = Logger.getLogger(ExServer.class.getName());

private final static String SEPARATOR = "/";
private final static String UNDER_CROSS = ".";
private final static String SEPARATORS;
static {
    SEPARATORS = "\\";
}
private final static String UNDERLINE = "_";

/*public Server server;

public void start() throws IOException {
    *//* The port on which the server should run *//*
    int port = 9000;

    server = ServerBuilder.forPort(port)
            .addService(new HookProviderImpl())
            .build()
            .start();
    logger.info("Server started, listening on " + port);
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            // Use stderr here since the logger may have been reset by its JVM shutdown hook.
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            try {
                ExServer.this.stop();
            } catch (InterruptedException e) {
                e.printStackTrace(System.err);
            }
            System.err.println("*** server shut down");
        }
    });
}

public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
}

*//**
 * Await termination on the main thread since the grpc library uses daemon threads.
 *//*
public void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
        server.awaitTermination();
    }
}*/

/**
 * Main launches the server from the command line.
 */

/* public static void main(String args) throws IOException, InterruptedException {
final ExServer server = new ExServer();
server.start();
server.blockUntilShutdown();
}*/
@Service(“hookProviderImpl”)
public static class HookProviderImpl extends HookProviderGrpc.HookProviderImplBase {
private RedisClientUtil redisUtil;
private JsonHelper jsonHelper;
private RabbitMqSenderHelper rabbitMqSenderHelper;
private MqttProviderConfig providerClient;
public HookProviderImpl(RedisClientUtil redisUtil, JsonHelper jsonHelper, RabbitMqSenderHelper rabbitMqSenderHelper, MqttProviderConfig providerClient) {
this.redisUtil=redisUtil;
this.jsonHelper=jsonHelper;
this.rabbitMqSenderHelper=rabbitMqSenderHelper;
this.providerClient=providerClient;
}

     public void DEBUG(String fn, Object req) {
        System.out.printf(fn + ", request: " + req);
    }

    @Override
    public void onProviderLoaded(io.emqx.exhook.ProviderLoadedRequest request, StreamObserver<io.emqx.exhook.LoadedResponse> responseObserver) {
       /* DEBUG("onProviderLoaded", request);*/
        io.emqx.exhook.HookSpec[] specs = {
                io.emqx.exhook.HookSpec.newBuilder().setName("client.connect").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("client.connack").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("client.connected").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("client.disconnected").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("client.authenticate").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("client.check_acl").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("client.subscribe").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("client.unsubscribe").build(),

                io.emqx.exhook.HookSpec.newBuilder().setName("session.created").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("session.subscribed").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("session.unsubscribed").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("session.resumed").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("session.discarded").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("session.takeovered").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("session.terminated").build(),

                io.emqx.exhook.HookSpec.newBuilder().setName("message.publish").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("message.delivered").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("message.acked").build(),
                io.emqx.exhook.HookSpec.newBuilder().setName("message.dropped").build()
        };
        io.emqx.exhook.LoadedResponse reply = io.emqx.exhook.LoadedResponse.newBuilder().addAllHooks(Arrays.asList(specs)).build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onProviderUnloaded(io.emqx.exhook.ProviderUnloadedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onProviderUnloaded", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    /**
     * 客户端连接
     * @param request
     * @param responseObserver
     */
    @Override
    public void onClientConnect(io.emqx.exhook.ClientConnectRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onClientConnect", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onClientConnack(io.emqx.exhook.ClientConnackRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onClientConnack", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    /**
     * 客户端已连上
     * @param request
     * @param responseObserver
     */
    @Override
    public void onClientConnected(io.emqx.exhook.ClientConnectedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onClientConnected", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    /**
     * 离线监听
     * @param request
     * @param responseObserver
     */
    @Override
    public void onClientDisconnected(io.emqx.exhook.ClientDisconnectedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
       DEBUG("onClientDisconnected", request);
     /*   String clientId =  request.getClientinfo().getClientid();
        if(clientId.contains(UNDERLINE+EnumeIM.DEVICETYPE_ROBOTICVACUUM.getKey())||clientId.contains(UNDERLINE+EnumeIM.DEVICETYPE_WINDOWCLEANMACHINE.getKey())) {
            int index = clientId.lastIndexOf(UNDERLINE);//lastIndexOf()方法可返回一个指定的字符串值最后出现的位置,在一个字符串中的指定位置从后向前搜索
            String deviceType = clientId.substring(index + EnumeIM.ROBOT_PARTS_EVENT_ONE_INT.getIntKey(), clientId.length());
            String deviceId = clientId.substring(EnumeIM.ROBOT_PARTS_EVENT_ZERO_INT.getIntKey(), index);

            String topic_key = "";
            String prop_key = "";
            prop_key = prop_key.concat(deviceId).concat(UNDER_CROSS).concat(deviceType).concat(UNDER_CROSS).concat("prop");
            topic_key = topic_key.concat(deviceId).concat(UNDER_CROSS).concat(deviceType).concat(UNDER_CROSS).concat("topic");
            String topicStr = redisUtil.getShardedJedis().get(topic_key);
            String propStr = redisUtil.getShardedJedis().get(prop_key);//从缓存中获取
            if (StringUtils.hasLength(propStr)) {
                Map<String, Object> propMap = jsonHelper.jsonStr2Object(propStr, Map.class);
                propMap.put("state", EnumeIM.offline.getKey());//修改属性state值为0
                redisUtil.getShardedJedis().set(prop_key, jsonHelper.object2JsonStr(propMap));//放入缓存中
            }
            if (StringUtils.hasLength(topicStr)) {
                String appKey = topicStr.split(SEPARATORS+UNDER_CROSS)[Integer.valueOf(EnumeIM.ROBOT_PARTS_EVENT_ZERO.getValue())];
                String iotType = topicStr.split(SEPARATORS+UNDER_CROSS)[Integer.valueOf(EnumeIM.ROBOT_PARTS_EVENT_ONE.getValue())];
                String acTime = topicStr.split(SEPARATORS+UNDER_CROSS)[Integer.valueOf(EnumeIM.ROBOT_PARTS_EVENT_TWO.getValue())];
                String topic = SEPARATOR + appKey + SEPARATOR + iotType + SEPARATOR + deviceType + SEPARATOR + deviceId + SEPARATOR + "report";
                Map<String, Object> content = assemblySendMsg(deviceId, appKey, iotType, acTime);
                //发送消息监听该主题的app端修改状态
                providerClient.publish(EnumeIM.QOS_ZERO.getIntKey(), true, topic, jsonHelper.object2JsonStr(content));
                //设备的属性状态值state修改为0
            }
        }*/

        EmptySuccess reply = EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onClientAuthenticate(io.emqx.exhook.ClientAuthenticateRequest request, StreamObserver<io.emqx.exhook.ValuedResponse> responseObserver) {
        DEBUG("onClientAuthenticate", request);
        io.emqx.exhook.ValuedResponse reply = io.emqx.exhook.ValuedResponse.newBuilder()
                                             .setBoolResult(true)
                                             .setType(io.emqx.exhook.ValuedResponse.ResponsedType.STOP_AND_RETURN)
                                             .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onClientCheckAcl(io.emqx.exhook.ClientCheckAclRequest request, StreamObserver<io.emqx.exhook.ValuedResponse> responseObserver) {
        DEBUG("onClientCheckAcl", request);
        io.emqx.exhook.ValuedResponse reply = io.emqx.exhook.ValuedResponse.newBuilder()
                                             .setBoolResult(true)
                                             .setType(io.emqx.exhook.ValuedResponse.ResponsedType.STOP_AND_RETURN)
                                             .build();

        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onClientSubscribe(io.emqx.exhook.ClientSubscribeRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
       DEBUG("onClientSubscribe", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onClientUnsubscribe(io.emqx.exhook.ClientUnsubscribeRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onClientUnsubscribe", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    /**
     * session创建
     * @param request
     * @param responseObserver
     */
    @Override
    public void onSessionCreated(io.emqx.exhook.SessionCreatedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onSessionCreated", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onSessionSubscribed(io.emqx.exhook.SessionSubscribedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onSessionSubscribed", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onSessionUnsubscribed(io.emqx.exhook.SessionUnsubscribedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onSessionUnsubscribed", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onSessionResumed(io.emqx.exhook.SessionResumedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onSessionResumed", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onSessionDiscarded(io.emqx.exhook.SessionDiscardedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
       DEBUG("onSessionDdiscarded", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onSessionTakeovered(io.emqx.exhook.SessionTakeoveredRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onSessionTakeovered", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    /**
     * session停止监听
     * @param request
     * @param responseObserver
     */
    @Override
    public void onSessionTerminated(io.emqx.exhook.SessionTerminatedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onSessionTerminated", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onMessagePublish(io.emqx.exhook.MessagePublishRequest request, StreamObserver<io.emqx.exhook.ValuedResponse> responseObserver) {
        DEBUG("onMessageDelivered", request);
        logger.info("接收的消息时间:"+new Date());
       ByteString bstr = ByteString.copyFromUtf8("hardcode payload by exhook-svr-java :)");

       Message nmsg = Message.newBuilder()
                .setId(request.getMessage().getId())
                .setNode(request.getMessage().getNode())
                .setFrom(request.getMessage().getFrom())
                .setTopic(request.getMessage().getTopic())
                .setPayload(bstr).build();

/*
String topic = request.getMessage().getTopic();
String prop_key = “”;//各个实例的key 业务系统通过redis 获取到属性
String topic_key = “”;//各个topic的key 客户端【机器】离线用到
String topic_value = “”;//各个topic的值
ByteString payload = request.getMessage().getPayload();
String payloadStr = payload.toStringUtf8();
if(topic.contains(“/report”)){//机器上报(订阅此地址的布什容可以接收到机器上报属性)
String appkey = topic.split(SEPARATOR)[EnumeIM.ROBOT_PARTS_EVENT_ONE_INT.getIntKey()];
String iotType = topic.split(SEPARATOR)[EnumeIM.ROBOT_PARTS_EVENT_TWO_INT.getIntKey()];
String deviceType = topic.split(SEPARATOR)[EnumeIM.ROBOT_PARTS_EVENT_THREE_INT.getIntKey()];
String deviceId = topic.split(SEPARATOR)[EnumeIM.ROBOT_PARTS_EVENT_FOUR_INT.getIntKey()];
prop_key = prop_key.concat(deviceId).concat(UNDER_CROSS).concat(deviceType).concat(UNDER_CROSS).concat(“prop”);
topic_key=topic_key.concat(deviceId).concat(UNDER_CROSS).concat(deviceType).concat(UNDER_CROSS).concat(“topic”);
Map map = jsonHelper.jsonStr2Object(payloadStr,Map.class);
if(map!=null){
if(map.get(“msgType”)!=null && EnumeIM.MSG_TYPEE.getValue().equals(map.get(“msgType”))){//服务器端发送的直接丢弃
}else {
String propStr = redisUtil.getShardedJedis().get(prop_key);
Map propMap = jsonHelper.jsonStr2Object(propStr,Map.class);
long acTime = Long.valueOf((String) map.get(“acTime”));
topic_value = topic_value.concat(appkey).concat(UNDER_CROSS).concat(iotType).concat(UNDER_CROSS).concat(String.valueOf(acTime + 1));
redisUtil.getShardedJedis().set(topic_key, topic_value);
//解析属性
Map valueMap = (Map) map.get(“value”);
if (propMap!=null && propMap.get(“acTime”) != null) {//缓存有值
Long acTimeOld = (Long) propMap.get(“acTime”);//旧的时间
if (acTime > acTimeOld) {//新的数据的时间比较新重新放入内存
valueMap.put(“acTime”, acTime);//放入属性中
redisUtil.getShardedJedis().set(prop_key, jsonHelper.object2JsonStr(valueMap));//放入缓存
}
} else {
valueMap.put(“acTime”, acTime);//放入属性中
redisUtil.getShardedJedis().set(prop_key, jsonHelper.object2JsonStr(valueMap));//放入缓存
}
}
}
} else if(topic.contains(“/notice”)){//机器上报的通知直接转到springmvc项目处理
rabbitMqSenderHelper.sender(payloadStr);

        }*/

        ValuedResponse reply = ValuedResponse.newBuilder()
                .setType(ValuedResponse.ResponsedType.STOP_AND_RETURN)
                .setMessage(nmsg).build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();

        logger.info("接收完成的时间:"+new Date());
    }

    @Override
    public void onMessageDelivered(io.emqx.exhook.MessageDeliveredRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onMessageDelivered", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onMessageAcked(io.emqx.exhook.MessageAckedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onMessageAcked", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void onMessageDropped(io.emqx.exhook.MessageDroppedRequest request, StreamObserver<io.emqx.exhook.EmptySuccess> responseObserver) {
        DEBUG("onMessageDropped", request);
        io.emqx.exhook.EmptySuccess reply = io.emqx.exhook.EmptySuccess.newBuilder().build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }
}

private static Map<String, Object> assemblySendMsg(String deviceId, String appKey, String iotType, String acTime) {
    Map<String, Object> content = new HashMap<String, Object>();
    content.put("seq", "robot0");
    content.put("cmd", "request");
    content.put("acTime", acTime);
    content.put("version", EnumeIM.offline.getKey());
    content.put("msgType", EnumeIM.MSG_TYPEE.getValue());
    Map<String, Object> value = Maps.newHashMap();
    value.put("state", EnumeIM.offline.getKey());
    Map<String, Object> control = Maps.newHashMap();
    control.put("appKey", appKey);
    control.put("targetId", deviceId);
    control.put("targetType", EnumeIM.PUSH_TARGETTYPE.getValue());
    control.put("broadcast", EnumeIM.PUSH_BROADCAST.getValue());
    control.put("iotType", iotType);
    content.put("value", value);
    content.put("control", control);
    return content;
}

}

package io.emqx.exhook.server;

import io.emqx.exhook.json.JsonHelper;
import io.emqx.exhook.mqtt.MqttProviderConfig;
import io.emqx.exhook.rabbitmq.RabbitMqSenderHelper;
import io.emqx.exhook.redis.RedisClientUtil;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@Service(“myServer”)
public class MyServer implements ApplicationListener {
private static final Logger logger = Logger.getLogger(ExServer.class.getName());

public Server server;

@Autowired
public  RedisClientUtil redisUtil;
@Autowired
public  JsonHelper jsonHelper;
@Autowired
public  RabbitMqSenderHelper rabbitMqSenderHelper;
@Autowired
public  MqttProviderConfig providerClient;

public void start() throws IOException {
    /* The port on which the server should run */
    int port = 9000;
    server = ServerBuilder.forPort(port)
            .addService(new ExServer.HookProviderImpl(redisUtil,jsonHelper,rabbitMqSenderHelper,providerClient))
            .build()
            .start();
    logger.info("Server started, listening on " + port);
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            // Use stderr here since the logger may have been reset by its JVM shutdown hook.
            System.err.println("*** shutting down gRPC server since JVM is shutting down");
            try {
                MyServer.this.stop();
            } catch (InterruptedException e) {
                e.printStackTrace(System.err);
            }
            System.err.println("*** server shut down");
        }
    });
}

public void stop() throws InterruptedException {
    if (server != null) {
        server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
    }
}

/**
 * Await termination on the main thread since the grpc library uses daemon threads.
 */
public void blockUntilShutdown() throws InterruptedException {
    if (server != null) {
        server.awaitTermination();
    }
}

@Override
public void onApplicationEvent(ApplicationStartedEvent event) {
    try {
        start();
    } catch (IOException e) {
        e.printStackTrace();
    }
    try {
        blockUntilShutdown();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

}