emqx_exproto每个客户端只能回调一次

问题描述

ConnectionHandler的代码如下,在每个onNext()中调用了responseObserver.onCompleted();,之后无论是发布消息,还是订阅到的消息,只能收到一次,第二次没有反应。看emqx日志没有任何错误提示。
如果不使用responseObserver.onCompleted();,则会触发onError(),触发原因是CANCELLED: cancelled before receiving half close

package emqx.exproto.v1;

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.*;

/**
 * @author wangwenhai
 * @date 2020/10/13
 * File description:
 */

public class ConnectionHandler extends ConnectionHandlerGrpc.ConnectionHandlerImplBase {
    public static ArrayList<String> socket = new ArrayList<>();
    private static final String HOST = "192.168.1.19:9100";
    static ManagedChannel channel;

    static {
        System.out.println("[LOG] Build singleton channel");
        channel = ManagedChannelBuilder.forTarget(HOST)
                .usePlaintext()
                .build();
    }

    static ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);

    @Override
    public StreamObserver<Exproto.SocketCreatedRequest> onSocketCreated(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.SocketCreatedRequest>() {
            @Override
            public void onNext(Exproto.SocketCreatedRequest socketCreatedRequest) {
                System.out.println("有新的设备接入了,接入信息如下:" + socketCreatedRequest.getConninfo());
                responseObserver.onNext(Exproto.EmptySuccess.newBuilder().build());
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
                System.out.println(throwable.getCause());
                System.out.println("error");
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        };
    }

    @Override
    public StreamObserver<Exproto.ReceivedBytesRequest> onReceivedBytes(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.ReceivedBytesRequest>() {
            @Override
            public void onNext(Exproto.ReceivedBytesRequest receivedBytesRequest) {
                String data = receivedBytesRequest.getBytes().toStringUtf8();
                System.out.println("收到数据" + data);
                if (data.startsWith("[") && data.endsWith("]")) {
                    System.out.println(socket);
                    String[] th = data.substring(1, data.length() - 1).split(",");
                    System.out.println("收到数据,主题为" + th[0] + " 设备mn为" + th[1]);
                    if (!socket.contains(receivedBytesRequest.getConn())) {
                        Exproto.ClientInfo clientInfo = Exproto.ClientInfo.newBuilder()
                                .setClientid(th[1])
                                .setUsername(th[1])
                                .build();
                        Exproto.AuthenticateRequest authenticateRequest = Exproto.AuthenticateRequest.newBuilder()
                                .setClientinfo(clientInfo)
                                .setConn(receivedBytesRequest.getConn())
                                .setPassword("password")
                                .build();
                        Exproto.CodeResponse response = blockingStub.authenticate(authenticateRequest);
                        System.out.println("[LOG] c2authenticate" + response.getCodeValue());
                        Exproto.SubscribeRequest subscribeRequest = Exproto.SubscribeRequest.newBuilder()
                                .setConn(receivedBytesRequest.getConn())
                                .setQos(1)
                                .setTopic(th[0])
                                .build();
                        socket.add(receivedBytesRequest.getConn());
                        Exproto.CodeResponse response2 = blockingStub.subscribe(subscribeRequest);
                        System.out.println("订阅结果: " + response2.getMessage());
                    }
                    Exproto.PublishRequest publishRequest = Exproto.PublishRequest.newBuilder()
                            .setConn(receivedBytesRequest.getConn())
                            .setTopic("/test1")
                            .setQos(0)
                            .setPayload(ByteString.copyFromUtf8(data)).build();
                    Exproto.CodeResponse response1 = blockingStub.publish(publishRequest);
                    System.out.println("已经将接受的消息发布" + response1.getMessage());
                }
                responseObserver.onNext(Exproto.EmptySuccess.newBuilder().build());
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println(throwable.getMessage());
                System.out.println("receive error");
                System.out.println("Encountered error in recordRoute" + throwable);
            }

            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
        };
    }

    @Override
    public StreamObserver<Exproto.SocketClosedRequest> onSocketClosed(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.SocketClosedRequest>() {
            @Override
            public void onNext(Exproto.SocketClosedRequest socketClosedRequest) {
                System.out.println("一个设备下线了,下线的设备的原因:" + socketClosedRequest.getReason());
                responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("close error");
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
        };
    }

    @Override
    public StreamObserver<Exproto.ReceivedMessagesRequest> onReceivedMessages(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.ReceivedMessagesRequest>() {
            @Override
            public void onNext(Exproto.ReceivedMessagesRequest receivedMessagesRequest) {
                System.out.println("[LOG] onReceivedMessages:" + receivedMessagesRequest.getConn());
                System.out.println(receivedMessagesRequest.getMessagesList());
                System.out.println(receivedMessagesRequest.getMessages(0));
                Exproto.Message message = receivedMessagesRequest.getMessages(0);
                Exproto.SendBytesRequest sendBytesRequest = Exproto.SendBytesRequest.newBuilder()
                        .setConn(receivedMessagesRequest.getConn())
                        .setBytes(message.getPayload())
                        .build();
                Exproto.CodeResponse response = blockingStub.send(sendBytesRequest);
                System.out.println("已发送" + response.getMessage());
                responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
                responseObserver.onCompleted();

            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("message error");
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }
        };
    }

    @Override
    public StreamObserver<Exproto.TimerTimeoutRequest> onTimerTimeout(StreamObserver<Exproto.EmptySuccess> responseObserver) {
        return new StreamObserver<Exproto.TimerTimeoutRequest>() {
            @Override
            public void onNext(Exproto.TimerTimeoutRequest timerTimeoutRequest) {
                System.out.println(timerTimeoutRequest.getType());
                responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
                responseObserver.onCompleted();
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("timerout" + throwable.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("test");
            }
        };
    }
}

环境信息

  • EMQ X 版本: 4.3-rc.4
  • 操作系统及版本:centos7
  • 其他

相应的配置文件内容


详细日志


相关截图

使用responseObserver.onCompleted();


注释responseObserver.onCompleted();

1赞

Java 这块我不熟悉,先帮你 Mark 一手大佬~ @wwhai

可以的话,可以抓个 9001 和 9100 的包,我这边看看 gRPC 处理的流程是不是对的。

抓包来看,Emqx上的grpcServer7993是把tcp发送的数据发给了9001。


但是不知道为什么不触发回调。

我在StreamObserver API看到的onNext的介绍客户端最多可以调用一次。通过抓包发现每次发送数据的端口号是一样的。所以我猜测可能是你们EMQX端的GRPC Server,每个客户端发送数据都是通过同一个变量回调的我这边的ConnectionHandler,导致onNext第二次不触发。

StreamObserver API截图

抓取两次发送的数据包截图


不知道,是否是这样的?

抱歉 最近比较忙 没看邮件。 最好是抓包文件发我份

看截图,简单推测还是请求还是发到 Java 那里去了的

Servers may invoke onNext at most once for client streaming calls, but may receive many onNext callbacks.

你看它这里说了 may receive many onNext callbacks. 意思是会触发多次的。我猜测可能还是编码没对。
不行的话.可以 DEBUG 它 grpc 库里面看看有没有处理这个应答的报文

debug第一次消息的栈,打上断点,第二次消息根本没有任何反应。

往 gRPC 库里面调 :face_with_hand_over_mouth:

断点就是打在maven引入的那些grpc的包上。

这里不能上传图片以外的文件

next是个流式操作,表示把请求让出来,你可以多次next,但是如果一个操作完了,必须complete。你检查一下你的代码是不是多次next或者没有complete。

上边有源代码,你可以看看,一次next,一次complete。

我看了下,看不出啥问题,要不你换个 grpc 的 builder 形式来构建对象?

Hello @wangscaler 抱歉最近有点忙。你的问题解决了么? 没有的话晚上我们一起看看 (可以远程和微信的方式 WeChatID: loftymoo)

没呢,准备erlang编译你们源码了 :joy:,微信加你了

我根据GRPC官方文档,写了个java的客户端流的Demo.梳理整个流程发现,上述的问题,确实应该是你们EMQX的BUG.调用完ConnectionHandler之后,没有使用responseObserver.onCompleted();(在java是这个)。
我的Demo在客户端调用了两次服务器端的方法。注释掉上述的代码之后,出现的现象和当前写的EMQX出现的现象是一致的。

Demo正常情况

Server端截图

Client端截图

在client端注释掉responseObserver.onCompleted();

Server端截图

Client端截图

1赞

.NET中,使用MoveNext()可以获取到流,但是是没有complete方法的;而且在循环获取MoveNext()时就会报错,导致TCP连接挂掉。能否出个.NET5的Demo,参考参考