EMQ 问答社区

emqx_exproto插件4.3开源版做了修改?

问题描述

之前使用企业版的插件emqx_exproto,写了demo,功能已经好使了。但是在4.3开源版,使用上次的代码,发现收不到回调。
当我在官网使用exopro
重新生产代码,发现和企业版的区别是。

4.2.5企业版

@java.lang.Override
    @java.lang.SuppressWarnings("unchecked")
    public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
      switch (methodId) {
        case METHODID_ON_SOCKET_CREATED:
          serviceImpl.onSocketCreated((emqx.exproto.v1.Exproto.SocketCreatedRequest) request,
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
          break;
        case METHODID_ON_SOCKET_CLOSED:
          serviceImpl.onSocketClosed((emqx.exproto.v1.Exproto.SocketClosedRequest) request,
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
          break;
        case METHODID_ON_RECEIVED_BYTES:
          serviceImpl.onReceivedBytes((emqx.exproto.v1.Exproto.ReceivedBytesRequest) request,
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
          break;
        case METHODID_ON_TIMER_TIMEOUT:
          serviceImpl.onTimerTimeout((emqx.exproto.v1.Exproto.TimerTimeoutRequest) request,
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
          break;
        case METHODID_ON_RECEIVED_MESSAGES:
          serviceImpl.onReceivedMessages((emqx.exproto.v1.Exproto.ReceivedMessagesRequest) request,
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
          break;
        default:
          throw new AssertionError();
      }
    }

    @java.lang.Override
    @java.lang.SuppressWarnings("unchecked")
    public io.grpc.stub.StreamObserver<Req> invoke(
        io.grpc.stub.StreamObserver<Resp> responseObserver) {
      switch (methodId) {
        default:
          throw new AssertionError();
      }
    }

4.3开源版

 @java.lang.Override
    @java.lang.SuppressWarnings("unchecked")
    public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
      switch (methodId) {
        default:
          throw new AssertionError();
      }
    }

    @java.lang.Override
    @java.lang.SuppressWarnings("unchecked")
    public io.grpc.stub.StreamObserver<Req> invoke(
        io.grpc.stub.StreamObserver<Resp> responseObserver) {
      switch (methodId) {
        case METHODID_ON_SOCKET_CREATED:
          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.onSocketCreated(
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
        case METHODID_ON_SOCKET_CLOSED:
          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.onSocketClosed(
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
        case METHODID_ON_RECEIVED_BYTES:
          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.onReceivedBytes(
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
        case METHODID_ON_TIMER_TIMEOUT:
          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.onTimerTimeout(
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
        case METHODID_ON_RECEIVED_MESSAGES:
          return (io.grpc.stub.StreamObserver<Req>) serviceImpl.onReceivedMessages(
              (io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.EmptySuccess>) responseObserver);
        default:
          throw new AssertionError();
      }
    }

4.2.5企业版回调的两个参数的方法,而4.3开源版回调的是一个参数的方法。
那么4.3该如何正确使用,拿到4.2.3中request中的参数呢?

环境信息

  • EMQ X 版本:4.3-rc.4
  • 操作系统及版本:centos7
  • 其他

相关截图

4.3连接回调

4.2.5连接回调

是的,4.3 系列更新接口了。这两个版本是不兼容的。

那么4.3该如何正确使用,拿到4.2.3中request中的参数呢?没有request这个形参的话,我就无法获取TCP客户端信息。

每种语言的实现不一样…看看他们怎么处理 stream 的

新版本的proto文件有改动吗?

之前的proto拿不到回调。现在的proto没有request参数不知道怎么拿tcp发送的数据

哈哈…抱歉。确实是有点麻烦这个…

你可以研究研究 java 里面 grpc 的 stream 接口,是怎么收数据的。应该就能调通了


早回复这句话不早完事了。 :innocent:

欢迎一起贡献代码帮助完善我们的项目

我这边的问题,抱歉