ExHook C# 怎么连呀

环境信息

  • EMQX 版本:5.0.8
  • 操作系统及版本:docker
  • 其他

问题描述

我创建了C# 的grpc 服务器 但是emqx 那边一直连接中,
C# 写了一个可以连上grpc

using Google.Protobuf;

using Grpc.Core;

namespace FirstGrpc.Services
{
    public class ExServer :HookProvider.HookProviderBase
    {
        private readonly ILogger<ExServer> _logger;
        public ExServer(ILogger<ExServer> logger)
        {
            _logger = logger;
        }

        public override Task<LoadedResponse> OnProviderLoaded(ProviderLoadedRequest request, ServerCallContext context)
        {
            HookSpec[] specs=
            {
                new HookSpec(){Name="client.connect"},
                new HookSpec(){Name="client.connack"},
                new HookSpec(){Name="client.connected"},
                new HookSpec(){Name="client.disconnected"},
                new HookSpec(){Name="client.authenticate"},
                new HookSpec(){Name="client.check_acl"},
                new HookSpec(){Name="client.subscribe"},
                new HookSpec(){Name="client.unsubscribe"},


                new HookSpec(){Name="session.created"},
                new HookSpec(){Name="session.subscribed"},
                new HookSpec(){Name="session.unsubscribed"},
                new HookSpec(){Name="session.resumed"},
                new HookSpec(){Name="session.discarded"},
                new HookSpec(){Name="session.takeovered"},
                new HookSpec(){Name="session.terminated"},

                new HookSpec(){Name="message.publish"},
                new HookSpec(){Name="message.delivered"},
                new HookSpec(){Name="message.acked"},
                new HookSpec(){Name="message.dropped"}

            };
            LoadedResponse response = new LoadedResponse();
            response.Hooks.Add(specs);

            return Task.FromResult(response);
        }

        public override Task<EmptySuccess> OnProviderUnloaded(ProviderUnloadedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();

            return Task.FromResult(reply);
            //return base.OnProviderUnloaded(request, context);
        }

        public override Task<EmptySuccess> OnClientConnect(ClientConnectRequest request, ServerCallContext context)
        {
            
            EmptySuccess reply = new EmptySuccess();
            
            return Task.FromResult(reply);
        }
        public override Task<EmptySuccess> OnClientConnack(ClientConnackRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();

            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnClientConnected(ClientConnectedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();

            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnClientDisconnected(ClientDisconnectedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<ValuedResponse> OnClientAuthenticate(ClientAuthenticateRequest request, ServerCallContext context)
        {
            ValuedResponse reply = new ValuedResponse()
            {
                BoolResult = true,
                Type = ValuedResponse.Types.ResponsedType.StopAndReturn
            };
            return Task.FromResult(reply);
        }


        public override Task<ValuedResponse> OnClientAuthorize(ClientAuthorizeRequest request, ServerCallContext context)
        {
            ValuedResponse reply = new ValuedResponse()
            {
                BoolResult = true,
                Type = ValuedResponse.Types.ResponsedType.StopAndReturn
            };
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnClientSubscribe(ClientSubscribeRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnClientUnsubscribe(ClientUnsubscribeRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnSessionCreated(SessionCreatedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnSessionSubscribed(SessionSubscribedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnSessionUnsubscribed(SessionUnsubscribedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnSessionDiscarded(SessionDiscardedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnSessionResumed(SessionResumedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnSessionTakenover(SessionTakenoverRequest request, ServerCallContext context)
        {

            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnSessionTerminated(SessionTerminatedRequest request, ServerCallContext context)
        {

            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<ValuedResponse> OnMessagePublish(MessagePublishRequest request, ServerCallContext context)
        {
            ByteString bstr = ByteString.CopyFromUtf8("hardcode payload by fitstGrpc");
            Message message = new Message
            {
                Id = request.Message.Id,
                Node=request.Message.Node,
                From = request.Message.From,
                Topic=request.Message.Topic,
                Payload=bstr
            };
            ValuedResponse reply = new ValuedResponse()
            {
                Type = ValuedResponse.Types.ResponsedType.StopAndReturn,
                Message = message
            };
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnMessageAcked(MessageAckedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }

        public override Task<EmptySuccess> OnMessageDelivered(MessageDeliveredRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }
        public override Task<EmptySuccess> OnMessageDropped(MessageDroppedRequest request, ServerCallContext context)
        {
            EmptySuccess reply = new EmptySuccess();
            return Task.FromResult(reply);
        }
    }
}

配置文件及日志

<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:00:21.984391+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"ssl">>, reason: econnrefused
2022-10-16T11:00:31.964214+00:00 [warning] Description: "Authenticity is not established by certificate path validation", Reason: "Option {verify, verify_peer} and cacertfile/cacerts is missing"
2022-10-16T11:00:32.064608+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"ssl">>,failed_action => ignore,key_dispatch => <0.2448.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-16T11:00:31.986177253+00:00",sysdescr => "EMQX",uptime => 1815650,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:00:32.065826+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"ssl">>, reason: econnrefused
2022-10-16T11:00:42.068280+00:00 [warning] Description: "Authenticity is not established by certificate path validation", Reason: "Option {verify, verify_peer} and cacertfile/cacerts is missing"
2022-10-16T11:00:42.168493+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"ssl">>,failed_action => ignore,key_dispatch => <0.2448.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-16T11:00:42.090288253+00:00",sysdescr => "EMQX",uptime => 1825754,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:00:42.169854+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"ssl">>, reason: econnrefused


这是新的日志,我吧服务器的https 关了

2022-10-16T11:10:51.597304+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"5220">>,failed_action => deny,key_dispatch => <0.2448.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-16T11:10:51.496382871+00:00",sysdescr => "EMQX",uptime => 2435160,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:10:51.599357+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"5220">>, reason: econnrefused
2022-10-16T11:11:04.149318+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"5220">>,failed_action => deny,key_dispatch => <0.2448.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-16T11:11:04.048004271+00:00",sysdescr => "EMQX",uptime => 2447712,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:11:04.150662+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"5220">>, reason: econnrefused
2022-10-16T11:11:14.253082+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"5220">>,failed_action => deny,key_dispatch => <0.2448.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-16T11:11:14.152383488+00:00",sysdescr => "EMQX",uptime => 2457816,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-16T11:11:14.254443+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"5220">>, reason: econnrefused

econnrefused 这个就是grpc server拒绝了emqx的连接。
你可以在emqx服务器上直接telnet一个对应的端口telnet localhost 49165,应该是不通的。

是这个问题,那么docker 要怎么连上外部的服务呀

#{datetime => "2022-10-17T14:47:11.055177661+00:00",sysdescr => "EMQX",uptime => 2268844,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-17T14:47:11.157739+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"12">>, reason: econnrefused
2022-10-17T14:48:11.261503+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"12">>,failed_action => deny,key_dispatch => <0.2492.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-17T14:48:11.160276883+00:00",sysdescr => "EMQX",uptime => 2328950,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-17T14:48:11.263158+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"12">>, reason: econnrefused
2022-10-17T14:49:11.367474+00:00 [error] function: on_provider_loaded, line: 412, mfa: emqx_exhook_server:do_call/5, module: emqx_exhook_v_2_hook_provider_client, msg: exhook_call_error, options: #{channel => <<"12">>,failed_action => deny,key_dispatch => <0.2492.0>,timeout => 5000}, reason: econnrefused, req: #{broker => #{datetime => "2022-10-17T14:49:11.266535121+00:00",sysdescr => "EMQX",uptime => 2389056,version => "5.0.8"},meta => #{cluster_name => "emqxcl",node => <<"emqx@172.17.0.2">>,sysdescr => "EMQX",version => "5.0.8"}}
2022-10-17T14:49:11.368745+00:00 [error] line: 340, mfa: emqx_exhook_mgr:do_load_server/1, msg: failed_to_load_exhook_callback_server, name: <<"12">>, reason: econnrefused

您好,我在window 上部署的,添加 ExHook 的时候 报错 400 BAD_REQUEST:Bad Arguments: #{discarded_errors_count => 0,kind => validation_error,path => “exhook.servers.1.name”,reason => <<“Bad ExHook Name <<230,181,139,232,175,149>>, expect "^[A-Za-z0-9]+[A-Za-z0-9-_]*$"”>>,value => <<230,181,139,232,175,149>>},能请教下您怎么链接的吗?

这个日志已经比较清楚地提示你原因了,你设置的 ExHook Name 不合法,必须符合这个条件 ^[A-Za-z0-9]+[A-Za-z0-9-_]*$

明白了,谢谢,忘记不能使用中文了