环境信息
- EMQ X 版本:4.3.5
- 操作系统及版本:windows
- 其他
问题描述
使用exhook的官方示例代码 https://github.com/emqx/emqx-extension-examples/tree/master/exhook-svr-go
本地启动emqx,本启动emqx_exhook 插件。
使用MQTTFX 连上emqx.没有触发“OnClientConnected” 方法
示例代码结果
go 代码
package main
import (
"context"
"fmt"
"log"
"net"
pb "emqx.io/grpc/exhook/protobuf"
utils "emqx.io/grpc/exhook/utils"
"google.golang.org/grpc"
)
const (
port = ":9000"
)
var cnter *utils.Counter = utils.NewCounter(0, 100)
// server is used to implement emqx_exhook_v1.s *server
type server struct {
pb.UnimplementedHookProviderServer
}
// HookProviderServer callbacks
func (s *server) OnProviderLoaded(ctx context.Context, in *pb.ProviderLoadedRequest) (*pb.LoadedResponse, error) {
log.Println("OnProviderLoaded")
cnter.Count(1)
hooks := []*pb.HookSpec{
&pb.HookSpec{Name: "client.connect"},
&pb.HookSpec{Name: "client.connack"},
&pb.HookSpec{Name: "client.connected"},
&pb.HookSpec{Name: "client.disconnected"},
&pb.HookSpec{Name: "client.authenticate"},
&pb.HookSpec{Name: "client.check_acl"},
&pb.HookSpec{Name: "client.subscribe"},
&pb.HookSpec{Name: "client.unsubscribe"},
&pb.HookSpec{Name: "session.created"},
&pb.HookSpec{Name: "session.subscribed"},
&pb.HookSpec{Name: "session.unsubscribed"},
&pb.HookSpec{Name: "session.resumed"},
&pb.HookSpec{Name: "session.discarded"},
&pb.HookSpec{Name: "session.takeovered"},
&pb.HookSpec{Name: "session.terminated"},
&pb.HookSpec{Name: "message.publish"},
&pb.HookSpec{Name: "message.delivered"},
&pb.HookSpec{Name: "message.acked"},
&pb.HookSpec{Name: "message.dropped"},
}
return &pb.LoadedResponse{Hooks: hooks}, nil
}
func (s *server) OnProviderUnloaded(ctx context.Context, in *pb.ProviderUnloadedRequest) (*pb.EmptySuccess, error) {
log.Println("OnProviderUnloaded")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnClientConnect(ctx context.Context, in *pb.ClientConnectRequest) (*pb.EmptySuccess, error) {
log.Println("OnClientConnect")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnClientConnack(ctx context.Context, in *pb.ClientConnackRequest) (*pb.EmptySuccess, error) {
log.Println("OnClientConnack")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnClientConnected(ctx context.Context, in *pb.ClientConnectedRequest) (*pb.EmptySuccess, error) {
fmt.Println("OnClientConnected")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnClientDisconnected(ctx context.Context, in *pb.ClientDisconnectedRequest) (*pb.EmptySuccess, error) {
log.Println("OnClientDisconnected")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnClientAuthenticate(ctx context.Context, in *pb.ClientAuthenticateRequest) (*pb.ValuedResponse, error) {
log.Println("OnClientAuthenticate")
cnter.Count(1)
reply := &pb.ValuedResponse{}
reply.Type = pb.ValuedResponse_STOP_AND_RETURN
reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
return reply, nil
}
func (s *server) OnClientCheckAcl(ctx context.Context, in *pb.ClientCheckAclRequest) (*pb.ValuedResponse, error) {
log.Println("OnClientCheckAcl")
cnter.Count(1)
reply := &pb.ValuedResponse{}
reply.Type = pb.ValuedResponse_STOP_AND_RETURN
reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
return &pb.ValuedResponse{}, nil
}
func (s *server) OnClientSubscribe(ctx context.Context, in *pb.ClientSubscribeRequest) (*pb.EmptySuccess, error) {
log.Println("OnClientSubscribe")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnClientUnsubscribe(ctx context.Context, in *pb.ClientUnsubscribeRequest) (*pb.EmptySuccess, error) {
log.Println("OnClientUnsubscribe")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnSessionCreated(ctx context.Context, in *pb.SessionCreatedRequest) (*pb.EmptySuccess, error) {
log.Println("OnSessionCreated")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnSessionSubscribed(ctx context.Context, in *pb.SessionSubscribedRequest) (*pb.EmptySuccess, error) {
log.Println("OnSessionSubscribed")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnSessionUnsubscribed(ctx context.Context, in *pb.SessionUnsubscribedRequest) (*pb.EmptySuccess, error) {
log.Println("OnSessionUnsubscribed")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnSessionResumed(ctx context.Context, in *pb.SessionResumedRequest) (*pb.EmptySuccess, error) {
log.Println("OnSessionResumed")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnSessionDiscarded(ctx context.Context, in *pb.SessionDiscardedRequest) (*pb.EmptySuccess, error) {
log.Println("OnSessionDiscarded")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnSessionTakeovered(ctx context.Context, in *pb.SessionTakeoveredRequest) (*pb.EmptySuccess, error) {
log.Println("OnSessionTakeovered")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnSessionTerminated(ctx context.Context, in *pb.SessionTerminatedRequest) (*pb.EmptySuccess, error) {
log.Println("OnSessionTerminated")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
log.Println("OnMessagePublish")
cnter.Count(1)
in.Message.Payload = []byte("hardcode payload by exhook-svr-go :)")
reply := &pb.ValuedResponse{}
reply.Type = pb.ValuedResponse_STOP_AND_RETURN
reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
return reply, nil
}
func (s *server) OnMessageDelivered(ctx context.Context, in *pb.MessageDeliveredRequest) (*pb.EmptySuccess, error) {
log.Println("OnMessageDelivered")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnMessageDropped(ctx context.Context, in *pb.MessageDroppedRequest) (*pb.EmptySuccess, error) {
log.Println("OnMessageDropped")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func (s *server) OnMessageAcked(ctx context.Context, in *pb.MessageAckedRequest) (*pb.EmptySuccess, error) {
log.Println("OnMessageAcked")
cnter.Count(1)
return &pb.EmptySuccess{}, nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterHookProviderServer(s, &server{})
log.Println("Started gRPC server on ::9000")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
配置文件及日志
2021-07-21T10:48:25.025000+08:00 [debug] [ExHook Svr] Call emqx_exhook_v_1_hook_provider_client:on_provider_loaded(#{broker => #{datetime => "2021-07-21 10:48:25",sysdescr => "EMQ X",uptime => "6 seconds",version => "4.3.5"}}, #{channel => "default"})
2021-07-21T10:48:25.027000+08:00 [debug] [ExHook Svr] Response {ok, #{hooks => [#{name => <<"client.connect">>,topics => []},#{name => <<"client.connack">>,topics => []},#{name => <<"client.connected">>,topics => []},#{name => <<"client.disconnected">>,topics => []},#{name => <<"client.authenticate">>,topics => []},#{name => <<"client.check_acl">>,topics => []},#{name => <<"client.subscribe">>,topics => []},#{name => <<"client.unsubscribe">>,topics => []},#{name => <<"session.created">>,topics => []},#{name => <<"session.subscribed">>,topics => []},#{name => <<"session.unsubscribed">>,topics => []},#{name => <<"session.resumed">>,topics => []},#{name => <<"session.discarde"...>>,topics => []},#{name => <<"session.take"...>>,topics => []},#{name => <<"session."...>>,topics => []},#{name => <<"mess"...>>,topics => []},#{name => <<...>>,...},#{...}|...]}, [{<<"grpc-status">>,<<"0">>},{<<"grpc-message">>,<<>>}]}
2021-07-21T10:48:25.028000+08:00 [info] [Plugins] Started plugins: [emqx_exhook]
2021-07-21T10:48:25.028000+08:00 [info] [Plugins] Load plugin emqx_exhook successfully
2021-07-21T10:48:35.948000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:48:52.983000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:49:10.075000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:49:27.139000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:49:44.194000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:50:01.288000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:50:18.359000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:50:35.418000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:50:52.471000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:51:09.517000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:51:26.559000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:51:43.624000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:52:00.679000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:52:17.715000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:52:34.789000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:52:51.871000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:53:08.935000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:53:26.000000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:53:43.078000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:54:00.127000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:54:17.182000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:54:34.239000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:54:51.320000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:55:08.372000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:55:25.415000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:55:42.521000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:55:59.586000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:56:16.624000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:56:33.711000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:56:50.765000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:57:07.819000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:57:24.879000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:57:41.947000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:57:59.011000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:58:16.057000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:58:33.120000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:58:50.183000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:59:07.236000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable
2021-07-21T10:59:24.300000+08:00 [warning] OS_MON (cpu_sup) called by <0.651.0>, unavailable