使用doris连接器执行动作时报错

环境

  • EMQX 版本:6.2.0
  • 操作系统版本:
  • Doris版本:4.0.5

重现此问题的步骤

  1. 创建doris连接器,测试连接返回成功
  2. 在规则中创建doris动作,sql:insert into test_1(data_time, grp_name, tag1, tag2, tag3) values (${ts},${grpName}, ${tag1}, ${tag2}, ${tag3})
  3. 点击测试连接报错:

    4.将sql修改为:insert into test_1(data_time, grp_name, tag1, tag2, tag3) values (‘${ts}’,‘${grpName}’, ‘${tag1}’, ‘${tag2}’, ‘${tag3}’),即给参数加上引号就可测试成功

预期行为

测试通过

实际行为

测试失败,报错:

{“code”:“TEST_FAILED”,“message”:“{disconnected,#{reason => {{{case_clause,<<0,0,0,0,128,0,0,2,0,0,0,0,1>>},[{mysql_protocol,prepare,4,[{file,"src/mysql_protocol.erl"},{line,232}]},{mysql_conn,named_prepare,3,[{file,"src/mysql_conn.erl"},{line,744}]},{mysql_conn,handle_call,3,[{file,"src/mysql_conn.erl"},{line,420}]},{gen_server,try_handle_call,4,[{file,"gen_server.erl"},{line,2470}]},{gen_server,handle_msg,3,[{file,"gen_server.erl"},{line,2499}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,333}]}]},{gen_server,call,[<0.45203.0>,{prepare,<<"action:doris:PROBE_CcLY8auV:connector:doris:dryrun">>,[<<"insert into example_from_mqtt(data_time, grp_name, tag1, tag2, tag3) \r\nvalues (">>,"?",<<",">>,"?",<<", ‘">>,"?",<<"’, ‘">>,"?",<<"’, ‘">>,"?",<<"’)">>]}]}},stacktrace => [{gen_server,call,2,[{file,"gen_server.erl"},{line,1221}]},{emqx_mysql,prepare_sql_to_conn,2,[{file,"emqx_mysql.erl"},{line,321}]},{emqx_mysql,prepare_sql_to_conn_list,2,[{file,"emqx_mysql.erl"},{line,291}]},{emqx_mysql,prepare_sql,2,[{file,"emqx_mysql.erl"},{line,263}]},{emqx_mysql,init_prepare,1,[{file,"emqx_mysql.erl"},{line,240}]},{emqx_bridge_mysql_connector,set_prepares,2,[{file,"emqx_bridge_mysql_connector.erl"},{line,166}]},{emqx_bridge_mysql_connector,on_add_channel,4,[{file,"emqx_bridge_mysql_connector.erl"},{line,75}]},{emqx_resource,call_add_channel,5,[{file,"emqx_resource.erl"},{line,603}]},{emqx_resource_manager,add_channels_to_resource_state,3,[{file,"emqx_resource_manager.erl"},{line,1242}]},{emqx_resource_manager,channels_health_check,2,[{file,"emqx_resource_manager.erl"},{line,1765}]},{emqx_resource_manager,continue_resource_health_check_connected,2,[{file,"emqx_resource_manager.erl"},{line,1626}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,3743}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,333}]}],exception => exit}}”}

这个报错属于 prepare 阶段兼容问题,不是连通性问题。栈里已经落到 mysql_protocol:prepare/4case_clause 了。
先按这个模板改一版再测:

INSERT INTO test_1(data_time, grp_name, tag1, tag2, tag3) VALUES (
  FROM_UNIXTIME(${ts}/1000),
  ${grpName},
  ${tag1},
  ${tag2},
  ${tag3}
)


SQL 模板里的占位符不要包引号(Doris 连接器文档也是这个约束)。

如果你要先临时跑通,也只给字符串列加英文单引号,data_time 继续用 FROM_UNIXTIME(${ts}/1000)
如果改完还是同一个报错,把下面 4 个信息贴出来:

  1. SHOW CREATE TABLE test_1;
  2. 规则 SQL + 动作 SQL 模板原文
  3. emqx.log 里该报错前后 100 行
  4. Doris FE 的 SELECT VERSION(); 输出