emqx_exproto模块,使用publish报错

环境信息

  • EMQ X 版本:4.2.5企业版
  • 操作系统及版本:centos7
  • 其他

问题描述

官网提供的模板内容如下:

package emqx.exproto.v1;

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

class TestClient {

    public static void main(String[] args) throws Exception {
        String target = "127.0.0.1:9001";
        Exproto.PublishRequest request = Exproto.PublishRequest.newBuilder()
                .setTopic("/test")
                .setQos(0)
                .setPayload(ByteString.copyFromUtf8("hello")).build();
        ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
                .build();
        ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
        Exproto.CodeResponse response = blockingStub.publish(request);
        System.out.println("Response:" + response.getMessage());
    }
}

按照官网的文档,我猜测这里应该是9100端口。(用默认的9001,他会说找不到Adapter)。
我将他换成9100之后:

//只改了这个。
String target = "192.168.1.19:9100";
//报错如下:
/*
Exception in thread "main" io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
Channel Pipeline: [SslHandler#0, ProtocolNegotiators$ClientTlsHandler#0, WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.publish(ConnectionAdapterGrpc.java:386)
	at emqx.exproto.v1.TestClient.main(TestClient.java:18)
Caused by: io.grpc.netty.shaded.io.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record: 485454502f312e31203430302042616420526571756573740d0a636f6e6e656374696f6e3a20636c6f73650d0a636f6e74656e742d6c656e6774683a20300d0a0d0a
	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1254)
	at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1322)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
	at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

 */

看样子是使用了ssl认证,而emqx那边没有使用,所以修改后内容

package emqx.exproto.v1;

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

class TestClient {

    public static void main(String[] args) throws Exception {
        String target = "192.168.1.19:9100";
        Exproto.PublishRequest request = Exproto.PublishRequest.newBuilder()
                .setTopic("/test")
                .setQos(0)
                .setPayload(ByteString.copyFromUtf8("hello")).build();
        ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
                .usePlaintext()
                .build();
        ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
        Exproto.CodeResponse response = blockingStub.publish(request);
        System.out.println("Response:" + response.getMessage());
    }
}
// 报错信息如下
/*
Exception in thread "main" io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 200
invalid content-type: null
headers: Metadata(:status=200,grpc-accept-encoding=gzip)
DATA-----------------------------
    5{error,'PARAMS_TYPE_ERROR',<<"The conn type error">>}
trailers: Metadata(grpc-message=,grpc-status=0)
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.publish(ConnectionAdapterGrpc.java:386)
	at emqx.exproto.v1.TestClient.main(TestClient.java:19)
*/

相关截图



换回官网示例

package emqx.exproto.v1;

import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

class TestClient {

    public static void main(String[] args) throws Exception {
        String target = "127.0.0.1:9001";
        Exproto.PublishRequest request = Exproto.PublishRequest.newBuilder()
                .setTopic("/test")
                .setQos(0)
                .setPayload(ByteString.copyFromUtf8("hello")).build();
        ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
                .usePlaintext()
                .build();
        ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
        Exproto.CodeResponse response = blockingStub.publish(request);
        System.out.println("Response:" + response.getMessage());
    }
}

并且再9001的上边加上Adapter

package emqx.exproto.v1;

import io.grpc.*;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;

import java.io.IOException;
import java.util.logging.Logger;

public class TestGRpcServer {
    private static final Logger logger = Logger.getLogger(TestGRpcServer.class.getName());

    public static void main(String[] args) throws IOException, InterruptedException {
        int port = 9001;
        Server server = NettyServerBuilder.forPort(9001)
                .setTransportTracerFactory(TransportTracer.getDefaultFactory())
                .addService(new ConnectionHandler())
                .addService(new ConnectionAdapter())
                .build()
                .start();
        logger.info("Server started, listening on " + port);
        server.awaitTermination();
    }
}

再client产生如下错误:

Exception in thread "main" io.grpc.StatusRuntimeException: UNIMPLEMENTED: Method emqx.exproto.v1.ConnectionAdapter/Publish is unimplemented
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.publish(ConnectionAdapterGrpc.java:386)
	at emqx.exproto.v1.TestClient.main(TestClient.java:19)

最终发现 ConnectionAdapterGrpc.java文件,内容如下

   public void publish(emqx.exproto.v1.Exproto.PublishRequest request,
        io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.CodeResponse> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_PUBLISH, responseObserver);
    }

你的理解是对的,官方的 Demo 还没来得及改。

emqx 监听 9100 端口,提供 ConnectionAdapter 服务。Java 会去调用这个服务的接口,例如发布 - publish

报错的原因是由于:PublishRequest 需要设置 conn 属性。 conn 来自于 - emqx-exproto 插件收到一个连接后调用 Java 的 ConnectionHandler 服务的 OnSocketCreated 方法中传入的。

1赞

稍等段时间,会安排详细的demo的 :pleading_face:

1赞
public class ConnectionHandler extends ConnectionHandlerGrpc.ConnectionHandlerImplBase {
    @Override
    public void onSocketCreated(Exproto.SocketCreatedRequest request, StreamObserver<Exproto.EmptySuccess> responseObserver) {
        Exproto.ConnInfo connInfo = request.getConninfo();
        // 只有IP 为 127.0.0.1才 让通过
        if (connInfo.getSockname().getHost().equals("192.168.1.19")) {
            System.out.println("[模拟数据库插入] 客户端上线:" + connInfo.toString());
            responseObserver.onNext(Exproto.EmptySuccess.getDefaultInstance());
            String target = "192.168.1.19:9100";
            System.out.println(request.getConn());
            Exproto.PublishRequest request1 = Exproto.PublishRequest.newBuilder()
                    .setConn(request.getConn())
                    .setTopic("/test")
                    .setQos(0)
                    .setPayload(ByteString.copyFromUtf8("hello")).build();
            ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
                    .usePlaintext()
                    .build();
            ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
            Exproto.CodeResponse response = blockingStub.publish(request1);
            System.out.println("Response:" + response.getMessage());
            responseObserver.onCompleted();
        }
    }

依然是上述错误

4月 20, 2021 3:48:40 下午 io.grpc.internal.SerializingExecutor run
严重: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@5c350ba5
io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 200
invalid content-type: null
headers: Metadata(:status=200,grpc-accept-encoding=gzip)
DATA-----------------------------
    {error,unexpected_call}
trailers: Metadata(grpc-message=,grpc-status=0)
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
	at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.publish(ConnectionAdapterGrpc.java:386)
	at emqx.exproto.v1.ConnectionHandler.onSocketCreated(ConnectionHandler.java:34)
	at emqx.exproto.v1.ConnectionHandlerGrpc$MethodHandlers.invoke(ConnectionHandlerGrpc.java:390)
	at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:180)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:814)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

我还是等Demo吧

Demo已经修复,请更新代码查看 :face_with_hand_over_mouth:

和上次的测试结果是一样的。

4月 23, 2021 9:42:41 上午 io.grpc.internal.SerializingExecutor run
严重: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@690cb97c
io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 200
invalid content-type: null
headers: Metadata(:status=200,grpc-accept-encoding=gzip,grpc-trace-bin=)
DATA-----------------------------
     
trailers: Metadata(grpc-message=,grpc-status=0)
	at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:221)
	at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:202)
	at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:131)
	at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.authenticate(ConnectionAdapterGrpc.java:364)
	at emqx.exproto.v1.ConnectionHandler.onSocketCreated(ConnectionHandler.java:38)
	at emqx.exproto.v1.ConnectionHandlerGrpc$MethodHandlers.invoke(ConnectionHandlerGrpc.java:382)
	at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
	at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
	at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:698)
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

这个确实是个bug

需要拉下这个库的 0.5.1 版本 Releases · emqx/grpc-erl · GitHub 然后使用 erlang/otp 22 编译。

然后把编译生成的 _build/default/lib/grpc/ebin/grpc_stream_h.beam 覆盖到你的 emqx lib/grpc-0.5.0/ebin 下面;然后重启 emqx 再试试


UPDATES: 上面搞错了…应该是 _build/default/lib/grpc/ebin/grpc_cowboy_h.beam 这个文件

image


正常了,感谢大佬

1赞

客气。可以多试试… 设计和实现上有什么建议 可以 开个帖子或者 Github PR,我们在讨论。 :partying_face:

:ok_hand:

建议增加一个参数Idle_timeout。让用户自定义超时时间,因为设备连接上之后,不一定立马发送数据。
image

1赞