环境信息
- EMQX 版本:5.0.8
- 操作系统及版本:docker
- 其他
问题描述
我创建了C# 的grpc 服务器 但是emqx 那边一直连接中,
C# 写了一个可以连上grpc
using Google.Protobuf;
using Grpc.Core;
namespace FirstGrpc.Services
{
public class ExServer :HookProvider.HookProviderBase
{
private readonly ILogger<ExServer> _logger;
public ExServer(ILogger<ExServer> logger)
{
_logger = logger;
}
public override Task<LoadedResponse> OnProviderLoaded(ProviderLoadedRequest request, ServerCallContext context)
{
HookSpec[] specs=
{
new HookSpec(){Name="client.connect"},
new HookSpec(){Name="client.connack"},
new HookSpec(){Name="client.connected"},
new HookSpec(){Name="client.disconnected"},
new HookSpec(){Name="client.authenticate"},
new HookSpec(){Name="client.check_acl"},
new HookSpec(){Name="client.subscribe"},
new HookSpec(){Name="client.unsubscribe"},
new HookSpec(){Name="session.created"},
new HookSpec(){Name="session.subscribed"},
new HookSpec(){Name="session.unsubscribed"},
new HookSpec(){Name="session.resumed"},
new HookSpec(){Name="session.discarded"},
new HookSpec(){Name="session.takeovered"},
new HookSpec(){Name="session.terminated"},
new HookSpec(){Name="message.publish"},
new HookSpec(){Name="message.delivered"},
new HookSpec(){Name="message.acked"},
new HookSpec(){Name="message.dropped"}
};
LoadedResponse response = new LoadedResponse();
response.Hooks.Add(specs);
return Task.FromResult(response);
}
public override Task<EmptySuccess> OnProviderUnloaded(ProviderUnloadedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
//return base.OnProviderUnloaded(request, context);
}
public override Task<EmptySuccess> OnClientConnect(ClientConnectRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnClientConnack(ClientConnackRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnClientConnected(ClientConnectedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnClientDisconnected(ClientDisconnectedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<ValuedResponse> OnClientAuthenticate(ClientAuthenticateRequest request, ServerCallContext context)
{
ValuedResponse reply = new ValuedResponse()
{
BoolResult = true,
Type = ValuedResponse.Types.ResponsedType.StopAndReturn
};
return Task.FromResult(reply);
}
public override Task<ValuedResponse> OnClientAuthorize(ClientAuthorizeRequest request, ServerCallContext context)
{
ValuedResponse reply = new ValuedResponse()
{
BoolResult = true,
Type = ValuedResponse.Types.ResponsedType.StopAndReturn
};
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnClientSubscribe(ClientSubscribeRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnClientUnsubscribe(ClientUnsubscribeRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnSessionCreated(SessionCreatedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnSessionSubscribed(SessionSubscribedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnSessionUnsubscribed(SessionUnsubscribedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnSessionDiscarded(SessionDiscardedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnSessionResumed(SessionResumedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnSessionTakenover(SessionTakenoverRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnSessionTerminated(SessionTerminatedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<ValuedResponse> OnMessagePublish(MessagePublishRequest request, ServerCallContext context)
{
ByteString bstr = ByteString.CopyFromUtf8("hardcode payload by fitstGrpc");
Message message = new Message
{
Id = request.Message.Id,
Node=request.Message.Node,
From = request.Message.From,
Topic=request.Message.Topic,
Payload=bstr
};
ValuedResponse reply = new ValuedResponse()
{
Type = ValuedResponse.Types.ResponsedType.StopAndReturn,
Message = message
};
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnMessageAcked(MessageAckedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnMessageDelivered(MessageDeliveredRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
public override Task<EmptySuccess> OnMessageDropped(MessageDroppedRequest request, ServerCallContext context)
{
EmptySuccess reply = new EmptySuccess();
return Task.FromResult(reply);
}
}
}
配置文件及日志
<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:00:21.984391+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"ssl">>, reason: econnrefused
2022-10-16T11:00:31.964214+00:00 [warning] Description: "Authenticity is not established by certificate path validation", Reason: "Option {verify, verify_peer} and cacertfile/cacerts is missing"
2022-10-16T11:00:32.064608+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"ssl">>,failed_action => ignore,key_dispatch => <0.2448.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-16T11:00:31.986177253+00:00",sysdescr => "EMQX",uptime => 1815650,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:00:32.065826+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"ssl">>, reason: econnrefused
2022-10-16T11:00:42.068280+00:00 [warning] Description: "Authenticity is not established by certificate path validation", Reason: "Option {verify, verify_peer} and cacertfile/cacerts is missing"
2022-10-16T11:00:42.168493+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"ssl">>,failed_action => ignore,key_dispatch => <0.2448.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-16T11:00:42.090288253+00:00",sysdescr => "EMQX",uptime => 1825754,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:00:42.169854+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"ssl">>, reason: econnrefused