问题描述
ConnectionHandler的代码如下,在每个onNext()中调用了responseObserver.onCompleted();
,之后无论是发布消息,还是订阅到的消息,只能收到一次,第二次没有反应。看emqx日志没有任何错误提示。
如果不使用responseObserver.onCompleted();
,则会触发onError(),触发原因是CANCELLED: cancelled before receiving half close
。
package emqx.exproto.v1;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.*;
/**
* @author wangwenhai
* @date 2020/10/13
* File description:
*/
public class ConnectionHandler extends ConnectionHandlerGrpc.ConnectionHandlerImplBase {
public static ArrayList<String> socket = new ArrayList<>();
private static final String HOST = "192.168.1.19:9100";
static ManagedChannel channel;
static {
System.out.println("[LOG] Build singleton channel");
channel = ManagedChannelBuilder.forTarget(HOST)
.usePlaintext()
.build();
}
static ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
@Override
public StreamObserver<Exproto.SocketCreatedRequest> onSocketCreated(StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.SocketCreatedRequest>() {
@Override
public void onNext(Exproto.SocketCreatedRequest socketCreatedRequest) {
System.out.println("有新的设备接入了,接入信息如下:" + socketCreatedRequest.getConninfo());
responseObserver.onNext(Exproto.EmptySuccess.newBuilder().build());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
System.out.println(throwable.getCause());
System.out.println("error");
}
@Override
public void onCompleted() {
System.out.println("completed");
}
};
}
@Override
public StreamObserver<Exproto.ReceivedBytesRequest> onReceivedBytes(StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.ReceivedBytesRequest>() {
@Override
public void onNext(Exproto.ReceivedBytesRequest receivedBytesRequest) {
String data = receivedBytesRequest.getBytes().toStringUtf8();
System.out.println("收到数据" + data);
if (data.startsWith("[") && data.endsWith("]")) {
System.out.println(socket);
String[] th = data.substring(1, data.length() - 1).split(",");
System.out.println("收到数据,主题为" + th[0] + " 设备mn为" + th[1]);
if (!socket.contains(receivedBytesRequest.getConn())) {
Exproto.ClientInfo clientInfo = Exproto.ClientInfo.newBuilder()
.setClientid(th[1])
.setUsername(th[1])
.build();
Exproto.AuthenticateRequest authenticateRequest = Exproto.AuthenticateRequest.newBuilder()
.setClientinfo(clientInfo)
.setConn(receivedBytesRequest.getConn())
.setPassword("password")
.build();
Exproto.CodeResponse response = blockingStub.authenticate(authenticateRequest);
System.out.println("[LOG] c2authenticate" + response.getCodeValue());
Exproto.SubscribeRequest subscribeRequest = Exproto.SubscribeRequest.newBuilder()
.setConn(receivedBytesRequest.getConn())
.setQos(1)
.setTopic(th[0])
.build();
socket.add(receivedBytesRequest.getConn());
Exproto.CodeResponse response2 = blockingStub.subscribe(subscribeRequest);
System.out.println("订阅结果: " + response2.getMessage());
}
Exproto.PublishRequest publishRequest = Exproto.PublishRequest.newBuilder()
.setConn(receivedBytesRequest.getConn())
.setTopic("/test1")
.setQos(0)
.setPayload(ByteString.copyFromUtf8(data)).build();
Exproto.CodeResponse response1 = blockingStub.publish(publishRequest);
System.out.println("已经将接受的消息发布" + response1.getMessage());
}
responseObserver.onNext(Exproto.EmptySuccess.newBuilder().build());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
System.out.println("receive error");
System.out.println("Encountered error in recordRoute" + throwable);
}
@Override
public void onCompleted() {
System.out.println("completed");
}
};
}
@Override
public StreamObserver<Exproto.SocketClosedRequest> onSocketClosed(StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.SocketClosedRequest>() {
@Override
public void onNext(Exproto.SocketClosedRequest socketClosedRequest) {
System.out.println("一个设备下线了,下线的设备的原因:" + socketClosedRequest.getReason());
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable throwable) {
System.out.println("close error");
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
}
@Override
public StreamObserver<Exproto.ReceivedMessagesRequest> onReceivedMessages(StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.ReceivedMessagesRequest>() {
@Override
public void onNext(Exproto.ReceivedMessagesRequest receivedMessagesRequest) {
System.out.println("[LOG] onReceivedMessages:" + receivedMessagesRequest.getConn());
System.out.println(receivedMessagesRequest.getMessagesList());
System.out.println(receivedMessagesRequest.getMessages(0));
Exproto.Message message = receivedMessagesRequest.getMessages(0);
Exproto.SendBytesRequest sendBytesRequest = Exproto.SendBytesRequest.newBuilder()
.setConn(receivedMessagesRequest.getConn())
.setBytes(message.getPayload())
.build();
Exproto.CodeResponse response = blockingStub.send(sendBytesRequest);
System.out.println("已发送" + response.getMessage());
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable throwable) {
System.out.println("message error");
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
};
}
@Override
public StreamObserver<Exproto.TimerTimeoutRequest> onTimerTimeout(StreamObserver<Exproto.EmptySuccess> responseObserver) {
return new StreamObserver<Exproto.TimerTimeoutRequest>() {
@Override
public void onNext(Exproto.TimerTimeoutRequest timerTimeoutRequest) {
System.out.println(timerTimeoutRequest.getType());
responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
responseObserver.onCompleted();
}
@Override
public void onError(Throwable throwable) {
System.out.println("timerout" + throwable.getMessage());
}
@Override
public void onCompleted() {
System.out.println("test");
}
};
}
}
环境信息
- EMQ X 版本: 4.3-rc.4
- 操作系统及版本:centos7
- 其他
相应的配置文件内容
详细日志