emqx pulsar_plugin异常

架构: 1 core + 1 replicant + plugin插件
系统: 5.10.109-104.500.amzn2.aarch64
版本: 5.2.1
问题: 当设备连接到core节点时,消息转发到pulsar异常日志; 当设备连接到replicant节点时,消息转发正常。 这里面是否有某种限制;

配置
core

node = {
    name = "emqx-vrrIUX@10.2.3.247"
    cookie = "xxx"
    data_dir = "data"
    role = core
    db_backend = rlog
    process_limit = 2048000
    max_ports = 1024000
    dist_buffer_size = 8192
    global_gc_interval = 15m
}

replicant

node = {
    name = "emqx-FFCVHY@10.2.7.214"
    cookie = "xxx"
    data_dir = "data"
    role = replicant
    db_backend = rlog
    process_limit = 2048000
    max_ports = 1024000
    dist_buffer_size = 8192
    global_gc_interval = 15m
}

另外当只有一个core时,消息转发到插件正常。

没有限制的。钩子只会在当前节点上触发,就是说当前节点上收到消息不会触发其他节点上的钩子,所以消息发送到哪个节点,消息就会通过哪个节点上的插件发送出去。

你说 pulsar 异常日志,是什么样的日志?emqx 这边对应的日志有没有?

我这边复现了这个问题:

集群采用etcd服务发现;

利用aws auto scale 同时启动两个core,其中一个core注册hook失败,但是却连接了pulsar,如图和日志。日志中没看到异常信息。
异常core

正常core

emqx.log.zip (7.3 KB)

插件相关的一些代码
msiot_emqx_bridge_sup.erl

-module(msiot_emqx_bridge_sup).

-behaviour(supervisor).

-export([start_link/0]).

-export([init/1]).

start_link() ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->

    PulsarClientSpec = #{id => iotms_pulsar_client,
        start => {iotms_pulsar_client, start_link, [iotms_pulsar_client, {}]},
        restart => transient,
        type => worker,
        modules => [iotms_pulsar_client]
    },

    {ok, { {one_for_all, 0, 1}, [PulsarClientSpec]} }.

msiot_emqx_bridge_app.erl

-module(msiot_emqx_bridge_app).

-behaviour(application).

-emqx_plugin(?MODULE).

-export([ start/2
        , stop/1
        ]).

start(_StartType, _StartArgs) ->
    %% 载入配置文件中配置项
    ok = iotms_config:load(),

    {ok, Sup} = msiot_emqx_bridge_sup:start_link(),
    msiot_emqx_bridge:load(application:get_all_env()),

    emqx_ctl:register_command(msiot_emqx_bridge, {msiot_emqx_bridge_cli, cmd}),
    {ok, Sup}.

stop(_State) ->
    emqx_ctl:unregister_command(msiot_emqx_bridge),
    msiot_emqx_bridge:unload().

msiot_emqx_bridge.erl

-module(msiot_emqx_bridge).


register_metrics() ->
    emqx_metrics:new('ms_pulsar_bridge.client_connected'),
    emqx_metrics:new('ms_pulsar_bridge.client_disconnected'),
    emqx_metrics:new('ms_pulsar_bridge.message_publish'),
    emqx_metrics:new('ms_pulsar_bridge.message_overflow'),
    emqx_metrics:new('ms_pulsar_bridge.message_failed'),
    ok.

load(Env) ->
    ok = register_metrics(),
    hook('client.connected', {?MODULE, on_client_connected, [Env]}),
    hook('client.disconnected', {?MODULE, on_client_disconnected, [Env]}),
    hook('message.publish', {?MODULE, on_message_publish, [Env]}).

推测是其中一个core节点 执行emqx_metrics:new 失败导致hook('client.connected', {?MODULE, on_client_connected, [Env]}) 没有执行。

你可以加一点 debug 信息来验证你的想法。或者用 dbg 来trace 一下相关代码:

我们在load(Env) 中加了日志,并且日志中确认hook函数返回的ok,还请您们看看这个问题

load(Env) ->
    ?LOG(warning, "load: ~p", [1]),
    RES1 = hook('client.connected', {?MODULE, on_client_connected, [Env]}),
    ?LOG(warning, "RES1: ~p", [RES1]),
    ?LOG(warning, "load: ~p", [2]),
    RES2 = hook('client.disconnected', {?MODULE, on_client_disconnected, [Env]}),
    ?LOG(warning, "RES2: ~p", [RES2]),
    ?LOG(warning, "load: ~p", [3]),
    RES3 = hook('message.publish', {?MODULE, on_message_publish, [Env]}),
    ?LOG(warning, "RES3: ~p", [RES3]),
    ?LOG(warning, "load: ~p", [4]),
    register_metrics().

log

2023-12-14T08:12:10.851989+00:00 [warning] msiot_emqx_bridge: load: 1
2023-12-14T08:12:10.852480+00:00 [warning] msiot_emqx_bridge: RES1: ok
2023-12-14T08:12:10.852619+00:00 [warning] msiot_emqx_bridge: load: 2
2023-12-14T08:12:10.852721+00:00 [warning] msiot_emqx_bridge: RES2: ok 2023-12-14T08:12:10.853287+00:00 [warning] msiot_emqx_bridge: load: 32023-12-14T08:12:10.853474+00:00 [warning] msiot_emqx_bridge: RES3: ok 2023-12-14T08:12:10.853529+00:00 [warning] msiot_emqx_bridge: load: 4

我看到只要 hook 函数返回 ok 就代表已经注册成功了,所以 ets 表已经已经插入成功了。现在通过 ets:tab2list(emqx_hooks) 查看表里没有的话,怀疑是后面又给取消注册了。你trace 一下:

emqx remote_console

dbg:tracer().
dbg:p(all, c).
dbg:tpl(emqx_hooks, '_', '_', cx).

输出会在 log/erlang.log.* 里

非常抱歉无法这样调试,或者是我没理解到你的意思;

原因是

  1. 插件写在配置文件中的,跟随emqx启动就启动了。
    如果我按照您的步骤执行,需要先启动emqx,而此时插件已经执行了注册hook操作了。
  2. 如果手动去安装插件再启动,则无法复现这个问题。

你可以把这几个指令放到你的插件启动代码里面。放到 load/1 里面吧:

load(Env) ->
   dbg:tracer(),
   dbg:p(all, c),
   dbg:tpl(emqx_hooks, '_', '_', cx),
   ...

重新编译插件再试一次。

ssh-test-jump_2023-12-14_下午 6_12_50.zip (25.0 KB)

以这个为准,加了查询hook的操作

ssh-test-jump_2023-12-14_下午 6_28_29.zip (24.8 KB)

哦 我好像明白了,新节点加入集群的时候,本节点会先重启,所以把 ets 表清空了。这时候应该要重启所有插件才行。

你在你的插件启动的时候加上这一句:

OldApps = application:get_env(emqx_machine, applications, []),
Apps = case lists:member(msiot_emqx_bridge , OldApps) of
   false -> [msiot_emqx_bridge | OldApps];
   true -> OldApps
end,
application:set_env(emqx_machine, applications, Apps),

如果管用我去报个问题,后面解决它。

加上它能够注册hook,但是创建生产者报错,另外一个core节点正常
ssh-test-jump_2023-12-14_下午 7_19_19.zip (31.5 KB)

这个看起来已经不是 emqx 的问题了,需要你自己调查一下插件里面的问题。

感谢~