//只改了这个。
String target = "192.168.1.19:9100";
//报错如下:
/*
Exception in thread "main" io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
Channel Pipeline: [SslHandler#0, ProtocolNegotiators$ClientTlsHandler#0, WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.publish(ConnectionAdapterGrpc.java:386)
at emqx.exproto.v1.TestClient.main(TestClient.java:18)
Caused by: io.grpc.netty.shaded.io.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record: 485454502f312e31203430302042616420526571756573740d0a636f6e6e656374696f6e3a20636c6f73650d0a636f6e74656e742d6c656e6774683a20300d0a0d0a
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1254)
at io.grpc.netty.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1322)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440)
at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
*/
看样子是使用了ssl认证,而emqx那边没有使用,所以修改后内容
package emqx.exproto.v1;
import com.google.protobuf.ByteString;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
class TestClient {
public static void main(String[] args) throws Exception {
String target = "192.168.1.19:9100";
Exproto.PublishRequest request = Exproto.PublishRequest.newBuilder()
.setTopic("/test")
.setQos(0)
.setPayload(ByteString.copyFromUtf8("hello")).build();
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
.usePlaintext()
.build();
ConnectionAdapterGrpc.ConnectionAdapterBlockingStub blockingStub = ConnectionAdapterGrpc.newBlockingStub(channel);
Exproto.CodeResponse response = blockingStub.publish(request);
System.out.println("Response:" + response.getMessage());
}
}
// 报错信息如下
/*
Exception in thread "main" io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 200
invalid content-type: null
headers: Metadata(:status=200,grpc-accept-encoding=gzip)
DATA-----------------------------
5{error,'PARAMS_TYPE_ERROR',<<"The conn type error">>}
trailers: Metadata(grpc-message=,grpc-status=0)
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.publish(ConnectionAdapterGrpc.java:386)
at emqx.exproto.v1.TestClient.main(TestClient.java:19)
*/
package emqx.exproto.v1;
import io.grpc.*;
import io.grpc.internal.TransportTracer;
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.util.logging.Logger;
public class TestGRpcServer {
private static final Logger logger = Logger.getLogger(TestGRpcServer.class.getName());
public static void main(String[] args) throws IOException, InterruptedException {
int port = 9001;
Server server = NettyServerBuilder.forPort(9001)
.setTransportTracerFactory(TransportTracer.getDefaultFactory())
.addService(new ConnectionHandler())
.addService(new ConnectionAdapter())
.build()
.start();
logger.info("Server started, listening on " + port);
server.awaitTermination();
}
}
再client产生如下错误:
Exception in thread "main" io.grpc.StatusRuntimeException: UNIMPLEMENTED: Method emqx.exproto.v1.ConnectionAdapter/Publish is unimplemented
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.publish(ConnectionAdapterGrpc.java:386)
at emqx.exproto.v1.TestClient.main(TestClient.java:19)
最终发现 ConnectionAdapterGrpc.java文件,内容如下
public void publish(emqx.exproto.v1.Exproto.PublishRequest request,
io.grpc.stub.StreamObserver<emqx.exproto.v1.Exproto.CodeResponse> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_PUBLISH, responseObserver);
}
4月 20, 2021 3:48:40 下午 io.grpc.internal.SerializingExecutor run
严重: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@5c350ba5
io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 200
invalid content-type: null
headers: Metadata(:status=200,grpc-accept-encoding=gzip)
DATA-----------------------------
{error,unexpected_call}
trailers: Metadata(grpc-message=,grpc-status=0)
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.publish(ConnectionAdapterGrpc.java:386)
at emqx.exproto.v1.ConnectionHandler.onSocketCreated(ConnectionHandler.java:34)
at emqx.exproto.v1.ConnectionHandlerGrpc$MethodHandlers.invoke(ConnectionHandlerGrpc.java:390)
at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:180)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:814)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
4月 23, 2021 9:42:41 上午 io.grpc.internal.SerializingExecutor run
严重: Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@690cb97c
io.grpc.StatusRuntimeException: UNKNOWN: HTTP status code 200
invalid content-type: null
headers: Metadata(:status=200,grpc-accept-encoding=gzip,grpc-trace-bin=)
DATA-----------------------------
trailers: Metadata(grpc-message=,grpc-status=0)
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:221)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:202)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:131)
at emqx.exproto.v1.ConnectionAdapterGrpc$ConnectionAdapterBlockingStub.authenticate(ConnectionAdapterGrpc.java:364)
at emqx.exproto.v1.ConnectionHandler.onSocketCreated(ConnectionHandler.java:38)
at emqx.exproto.v1.ConnectionHandlerGrpc$MethodHandlers.invoke(ConnectionHandlerGrpc.java:382)
at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:171)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:283)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:698)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)