EMQ 问答社区

exhook 没有触发grpc回调OnClientConnected

环境信息

  • EMQ X 版本:4.3.5
  • 操作系统及版本:windows
  • 其他

问题描述

使用exhook的官方示例代码 https://github.com/emqx/emqx-extension-examples/tree/master/exhook-svr-go
本地启动emqx,本启动emqx_exhook 插件。
使用MQTTFX 连上emqx.没有触发“OnClientConnected” 方法
示例代码结果
image

go 代码

package main

import (
	"context"
	"fmt"
	"log"
	"net"

	pb "emqx.io/grpc/exhook/protobuf"
	utils "emqx.io/grpc/exhook/utils"
	"google.golang.org/grpc"
)

const (
	port = ":9000"
)

var cnter *utils.Counter = utils.NewCounter(0, 100)

// server is used to implement emqx_exhook_v1.s *server
type server struct {
	pb.UnimplementedHookProviderServer
}

// HookProviderServer callbacks

func (s *server) OnProviderLoaded(ctx context.Context, in *pb.ProviderLoadedRequest) (*pb.LoadedResponse, error) {
	log.Println("OnProviderLoaded")
	cnter.Count(1)
	hooks := []*pb.HookSpec{
		&pb.HookSpec{Name: "client.connect"},
		&pb.HookSpec{Name: "client.connack"},
		&pb.HookSpec{Name: "client.connected"},
		&pb.HookSpec{Name: "client.disconnected"},
		&pb.HookSpec{Name: "client.authenticate"},
		&pb.HookSpec{Name: "client.check_acl"},
		&pb.HookSpec{Name: "client.subscribe"},
		&pb.HookSpec{Name: "client.unsubscribe"},
		&pb.HookSpec{Name: "session.created"},
		&pb.HookSpec{Name: "session.subscribed"},
		&pb.HookSpec{Name: "session.unsubscribed"},
		&pb.HookSpec{Name: "session.resumed"},
		&pb.HookSpec{Name: "session.discarded"},
		&pb.HookSpec{Name: "session.takeovered"},
		&pb.HookSpec{Name: "session.terminated"},
		&pb.HookSpec{Name: "message.publish"},
		&pb.HookSpec{Name: "message.delivered"},
		&pb.HookSpec{Name: "message.acked"},
		&pb.HookSpec{Name: "message.dropped"},
	}
	return &pb.LoadedResponse{Hooks: hooks}, nil
}

func (s *server) OnProviderUnloaded(ctx context.Context, in *pb.ProviderUnloadedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnProviderUnloaded")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnClientConnect(ctx context.Context, in *pb.ClientConnectRequest) (*pb.EmptySuccess, error) {
	log.Println("OnClientConnect")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnClientConnack(ctx context.Context, in *pb.ClientConnackRequest) (*pb.EmptySuccess, error) {

	log.Println("OnClientConnack")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnClientConnected(ctx context.Context, in *pb.ClientConnectedRequest) (*pb.EmptySuccess, error) {
	fmt.Println("OnClientConnected")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnClientDisconnected(ctx context.Context, in *pb.ClientDisconnectedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnClientDisconnected")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnClientAuthenticate(ctx context.Context, in *pb.ClientAuthenticateRequest) (*pb.ValuedResponse, error) {
	log.Println("OnClientAuthenticate")
	cnter.Count(1)
	reply := &pb.ValuedResponse{}
	reply.Type = pb.ValuedResponse_STOP_AND_RETURN
	reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
	return reply, nil
}

func (s *server) OnClientCheckAcl(ctx context.Context, in *pb.ClientCheckAclRequest) (*pb.ValuedResponse, error) {
	log.Println("OnClientCheckAcl")
	cnter.Count(1)
	reply := &pb.ValuedResponse{}
	reply.Type = pb.ValuedResponse_STOP_AND_RETURN
	reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
	return &pb.ValuedResponse{}, nil
}

func (s *server) OnClientSubscribe(ctx context.Context, in *pb.ClientSubscribeRequest) (*pb.EmptySuccess, error) {
	log.Println("OnClientSubscribe")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnClientUnsubscribe(ctx context.Context, in *pb.ClientUnsubscribeRequest) (*pb.EmptySuccess, error) {
	log.Println("OnClientUnsubscribe")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnSessionCreated(ctx context.Context, in *pb.SessionCreatedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnSessionCreated")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}
func (s *server) OnSessionSubscribed(ctx context.Context, in *pb.SessionSubscribedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnSessionSubscribed")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnSessionUnsubscribed(ctx context.Context, in *pb.SessionUnsubscribedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnSessionUnsubscribed")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnSessionResumed(ctx context.Context, in *pb.SessionResumedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnSessionResumed")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnSessionDiscarded(ctx context.Context, in *pb.SessionDiscardedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnSessionDiscarded")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnSessionTakeovered(ctx context.Context, in *pb.SessionTakeoveredRequest) (*pb.EmptySuccess, error) {
	log.Println("OnSessionTakeovered")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnSessionTerminated(ctx context.Context, in *pb.SessionTerminatedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnSessionTerminated")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
	log.Println("OnMessagePublish")
	cnter.Count(1)
	in.Message.Payload = []byte("hardcode payload by exhook-svr-go :)")
	reply := &pb.ValuedResponse{}
	reply.Type = pb.ValuedResponse_STOP_AND_RETURN
	reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
	return reply, nil
}

func (s *server) OnMessageDelivered(ctx context.Context, in *pb.MessageDeliveredRequest) (*pb.EmptySuccess, error) {
	log.Println("OnMessageDelivered")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnMessageDropped(ctx context.Context, in *pb.MessageDroppedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnMessageDropped")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func (s *server) OnMessageAcked(ctx context.Context, in *pb.MessageAckedRequest) (*pb.EmptySuccess, error) {
	log.Println("OnMessageAcked")
	cnter.Count(1)
	return &pb.EmptySuccess{}, nil
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterHookProviderServer(s, &server{})
	log.Println("Started gRPC server on ::9000")
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

配置文件及日志

2021-07-21T10:48:25.025000+08:00 [debug] [ExHook Svr] Call emqx_exhook_v_1_hook_provider_client:on_provider_loaded(#{broker => #{datetime => "2021-07-21 10:48:25",sysdescr => "EMQ X",uptime => "6 seconds",version => "4.3.5"}}, #{channel => "default"})
2021-07-21T10:48:25.027000+08:00 [debug] [ExHook Svr] Response {ok, #{hooks => [#{name => <<"client.connect">>,topics => []},#{name => <<"client.connack">>,topics => []},#{name => <<"client.connected">>,topics => []},#{name => <<"client.disconnected">>,topics => []},#{name => <<"client.authenticate">>,topics => []},#{name => <<"client.check_acl">>,topics => []},#{name => <<"client.subscribe">>,topics => []},#{name => <<"client.unsubscribe">>,topics => []},#{name => <<"session.created">>,topics => []},#{name => <<"session.subscribed">>,topics => []},#{name => <<"session.unsubscribed">>,topics => []},#{name => <<"session.resumed">>,topics => []},#{name => <<"session.discarde"...>>,topics => []},#{name => <<"session.take"...>>,topics => []},#{name => <<"session."...>>,topics => []},#{name => <<"mess"...>>,topics => []},#{name => <<...>>,...},#{...}|...]}, [{<<"grpc-status">>,<<"0">>},{<<"grpc-message">>,<<>>}]}
2021-07-21T10:48:25.028000+08:00 [info] [Plugins] Started plugins: [emqx_exhook]
2021-07-21T10:48:25.028000+08:00 [info] [Plugins] Load plugin emqx_exhook successfully
2021-07-21T10:48:35.948000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:48:52.983000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:49:10.075000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:49:27.139000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:49:44.194000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:50:01.288000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:50:18.359000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:50:35.418000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:50:52.471000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:51:09.517000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:51:26.559000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:51:43.624000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:52:00.679000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:52:17.715000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:52:34.789000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:52:51.871000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:53:08.935000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:53:26.000000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:53:43.078000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:54:00.127000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:54:17.182000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:54:34.239000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:54:51.320000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:55:08.372000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:55:25.415000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:55:42.521000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:55:59.586000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:56:16.624000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:56:33.711000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:56:50.765000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:57:07.819000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:57:24.879000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:57:41.947000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:57:59.011000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:58:16.057000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:58:33.120000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:58:50.183000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:59:07.236000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:59:24.300000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable

OnClientConnected是连接成功之后才会调用的,你的目的是连接成功之后调用还是连接的时候调用,如果是后者的话,肯定不会调用的。

链接成功后调用。你可以用示例跑下。

我用的python写的,是可以的

这个插件启动的时候,会调用OnProviderLoaded,获取你需要启动的列表,所以启动插件的前提是你这个插件的服务得是开启状态。
如果你是先启动插件,后启动的9000服务,那么重启下插件应该就可以了。

image
当然,我是先启动了grpc 服务。
并且启动exhook插件。
grpc 服务也打印提示调用OnProviderLoaded 方法。

我待会使用python 的试试

已经解决。是我客户端配置错了。