问题描述
ConnectionHandler的代码如下,在每个onNext()中调用了responseObserver.onCompleted();,之后无论是发布消息,还是订阅到的消息,只能收到一次,第二次没有反应。看emqx日志没有任何错误提示。
如果不使用responseObserver.onCompleted();,则会触发onError(),触发原因是CANCELLED: cancelled before receiving half close。
package emqx.exproto.v1;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.*;
/**
 * @author wangwenhai
 * @date 2020/10/13
 * File description:
 */
public class ConnectionHandler extends ConnectionHandlerGrpc.ConnectionHandlerImplBase {
    public static ArrayList<String> socket = new ArrayList<>();
    private static final String HOST = "192.168.1.19:9100";
    static ManagedChannel channel;
    static {
        System.out.println("[LOG] Build singleton channel");
        channel = ManagedChannelBuilder.forTarget(HOST)
                .usePlaintext()
                .build();
    }
    static ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
    @Override
    public StreamObserver<Exproto.SocketCreatedRequest> onSocketCreated(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.SocketCreatedRequest>() {
            @Override
            public void onNext(Exproto.SocketCreatedRequest socketCreatedRequest) {
                System.out.println("有新的设备接入了,接入信息如下:" + socketCreatedRequest.getConninfo());
                responseObserver.onNext(Exproto.EmptySuccess.newBuilder().build());
                responseObserver.onCompleted();
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
                System.out.println(throwable.getCause());
                System.out.println("error");
            }
            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        };
    }
    @Override
    public StreamObserver<Exproto.ReceivedBytesRequest> onReceivedBytes(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.ReceivedBytesRequest>() {
            @Override
            public void onNext(Exproto.ReceivedBytesRequest receivedBytesRequest) {
                String data = receivedBytesRequest.getBytes().toStringUtf8();
                System.out.println("收到数据" + data);
                if (data.startsWith("[") && data.endsWith("]")) {
                    System.out.println(socket);
                    String[] th = data.substring(1, data.length() - 1).split(",");
                    System.out.println("收到数据,主题为" + th[0] + " 设备mn为" + th[1]);
                    if (!socket.contains(receivedBytesRequest.getConn())) {
                        Exproto.ClientInfo clientInfo = Exproto.ClientInfo.newBuilder()
                                .setClientid(th[1])
                                .setUsername(th[1])
                                .build();
                        Exproto.AuthenticateRequest authenticateRequest = Exproto.AuthenticateRequest.newBuilder()
                                .setClientinfo(clientInfo)
                                .setConn(receivedBytesRequest.getConn())
                                .setPassword("password")
                                .build();
                        Exproto.CodeResponse response = blockingStub.authenticate(authenticateRequest);
                        System.out.println("[LOG] c2authenticate" + response.getCodeValue());
                        Exproto.SubscribeRequest subscribeRequest = Exproto.SubscribeRequest.newBuilder()
                                .setConn(receivedBytesRequest.getConn())
                                .setQos(1)
                                .setTopic(th[0])
                                .build();
                        socket.add(receivedBytesRequest.getConn());
                        Exproto.CodeResponse response2 = blockingStub.subscribe(subscribeRequest);
                        System.out.println("订阅结果: " + response2.getMessage());
                    }
                    Exproto.PublishRequest publishRequest = Exproto.PublishRequest.newBuilder()
                            .setConn(receivedBytesRequest.getConn())
                            .setTopic("/test1")
                            .setQos(0)
                            .setPayload(ByteString.copyFromUtf8(data)).build();
                    Exproto.CodeResponse response1 = blockingStub.publish(publishRequest);
                    System.out.println("已经将接受的消息发布" + response1.getMessage());
                }
                responseObserver.onNext(Exproto.EmptySuccess.newBuilder().build());
                responseObserver.onCompleted();
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
                System.out.println("receive error");
                System.out.println("Encountered error in recordRoute" + throwable);
            }
            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        };
    }
    @Override
    public StreamObserver<Exproto.SocketClosedRequest> onSocketClosed(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.SocketClosedRequest>() {
            @Override
            public void onNext(Exproto.SocketClosedRequest socketClosedRequest) {
                System.out.println("一个设备下线了,下线的设备的原因:" + socketClosedRequest.getReason());
                responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
                responseObserver.onCompleted();
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("close error");
            }
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
        };
    }
    @Override
    public StreamObserver<Exproto.ReceivedMessagesRequest> onReceivedMessages(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.ReceivedMessagesRequest>() {
            @Override
            public void onNext(Exproto.ReceivedMessagesRequest receivedMessagesRequest) {
                System.out.println("[LOG] onReceivedMessages:" + receivedMessagesRequest.getConn());
                System.out.println(receivedMessagesRequest.getMessagesList());
                System.out.println(receivedMessagesRequest.getMessages(0));
                Exproto.Message message = receivedMessagesRequest.getMessages(0);
                Exproto.SendBytesRequest sendBytesRequest = Exproto.SendBytesRequest.newBuilder()
                        .setConn(receivedMessagesRequest.getConn())
                        .setBytes(message.getPayload())
                        .build();
                Exproto.CodeResponse response = blockingStub.send(sendBytesRequest);
                System.out.println("已发送" + response.getMessage());
                responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
                responseObserver.onCompleted();
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("message error");
            }
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
        };
    }
    @Override
    public StreamObserver<Exproto.TimerTimeoutRequest> onTimerTimeout(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.TimerTimeoutRequest>() {
            @Override
            public void onNext(Exproto.TimerTimeoutRequest timerTimeoutRequest) {
                System.out.println(timerTimeoutRequest.getType());
                responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
                responseObserver.onCompleted();
            }
            @Override
            public void onError(Throwable throwable) {
                System.out.println("timerout" + throwable.getMessage());
            }
            @Override
            public void onCompleted() {
                System.out.println("test");
            }
        };
    }
}
环境信息
- EMQ X 版本: 4.3-rc.4
- 操作系统及版本:centos7
- 其他
相应的配置文件内容
详细日志
 
        






 ,微信加你了
,微信加你了


