EMQX 企业版 5.9 配置kafka 生产者连接器无法把emqx 接到数据转发到kafka topic

环境

  • EMQX 版本:5.9 企业版
  • 操作系统版本:mac docker image

重现此问题的步骤

  1. 在emqx 控制台配置连接器,到kafka,kafka 版本使用apache-kafka 4.0 和 3.2
  2. 配置规则 SELECT

FROM
“t/#”
3. 配置动作输出,kafka 生产者

预期行为

mqttx pub -i emqx_c -t t/1 -m ‘{ “msg”: “Hello Kafka” }’ -u ‘xx’ -P ‘xx’

kafka topic 中没有消息

追踪日志中提示: buffer overflow

2025-05-28T09:21:25.338846+00:00 [QUERY_RENDER] emqx_c@192.168.65.1:40270 msg: action_template_rendered, action_id: action:kafka_producer:newMQTT:connector:kafka_producer:newMQTT, client_ids: [emqx_c: true], matched: t/#, payload_encode: text, result: Encoded(text)=[message: [value: {“publish_received_at”:1748424085338,“pub_props”:{“User-Property”:{}},“qos”:0,“topic”:“t/1”,“clientid”:“emqx_c”,“client_attrs”:{},“peerhost”:“192.168.65.1”,“payload”:“{ "msg": "Hello Kafka" }”,“username”:“taoshi”,“event”:“message.publish”,“metadata”:{“rule_id”:“rule_vqdg”},“peername":“192.168.65.1:40270”,“timestamp”:1748424085338,“node”:"emqx@172.17.0.3”,“id”:“0006362EB485F8FF4259000661D60002”,“flags”:{“retain”:false,“dup”:false}}, key: emqx_c, headers: , ts: 1748424085338]], rule_id: rule_vqdg, rule_ids: [rule_vqdg: true], rule_trigger_ts: [1748424085338], trigger: t/1, username: taoshi
2025-05-28T09:21:25.339059+00:00 [MQTT] emqx_c@192.168.65.1:40270 msg: mqtt_packet_received, packet: DISCONNECT(Q0, R0, D0, ReasonCode=0), username: taoshi
2025-05-28T09:21:25.339104+00:00 [SOCKET] emqx_c@192.168.65.1:40270 msg: socket_force_closed, reason: normal, username: taoshi
2025-05-28T09:21:25.339220+00:00 [ACTION] emqx_c@ msg: action_failed, action_info: [name: newMQTT, type: kafka_producer], reason: buffer_overflow, rule_id: rule_vqdg, rule_trigger_ts: [1748424085338]
2025-05-28T09:21:25.339228+00:00 [SOCKET] emqx_c@192.168.65.1:40270 msg: emqx_connection_terminated, reason: {shutdown,normal}, username: taoshi

实际行为

你测试的这么点消息量,不应该 buffer overflow 的,还有其它的额外操作么(或者说,你用 kafka 的命令行能写入消息么)

PS:我按这个试了一下,是可以的。全 docker 环境。