我需要按照规则产生的KEY信息,作为桥接Kafka的时候 消息KEY,如下图
请问如何配置呢
试试${.kafka_key}
我试了,消息都发到一个分区里去了
我看你图里面是没有用 点 .
如果不行,那就是不行了…
undefined
程序里消费,看到KEY :undefined
所以你试的 key 是${.kafka_key} 还是 ${kafka_key} 可以都试试,如果都是没有拿到值,那就是不行。
看代码 key 和 value 都是使用一样的 render 函数
,讲道理能在 value 里面拿到的值,在 key 里面也是一样的才对。
kafka_key是我造出来,利用规则引擎SQL查询出来的。估计是不行,只能是MQTT 消息固有属性里面的才可以,像是payload、clientId 等
我刚在 5.8.6 上试了一下你这个,是可以的。
如果方便的话,可以帮忙 debug 一下么。
进入控制台:
./bin/emqx remote_console
输入:
t(emqx_bridge_kafka_impl_producer,on_query_async).
记得回车
1:8:08.904493 <0.15043.5198> emqx_bridge_kafka_impl_producer:on_query_async(<<“connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>, {<<“action:kafka_producer:TSP_SCHEDULE_KAFKA_WITH_KEY:connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>,
#{<<“kafka_key”>> => <<“undefined”>>,
<<“payload”>> =>
<<10,19,49,55,52,52,57,51,56,52,56,56,54,54,48,73,107,78,48,48,52,18,
17,84,69,83,84,95,88,82,89,48,48,48,49,57,57,55,54,55,24,212,166,
193,179,228,50,32,186,1,50,216,2,8,1,18,211,2,10,4,8,2,16,5,18,4,16,
1,24,0,24,60,32,1,40,0,50,14,8,165,201,185,14,16,228,196,192,57,24,
1,104,2,56,1,64,2,72,0,80,1,88,1,98,164,2,10,26,8,0,16,3,24,0,32,0,
40,0,48,1,56,1,64,0,72,0,152,2,192,34,160,2,254,15,18,21,10,8,8,2,
16,29,24,26,32,95,16,49,24,73,32,10,224,1,155,159,17,26,63,8,62,16,
54,24,82,32,98,40,41,48,62,56,69,64,42,72,93,80,91,88,68,96,91,104,
62,112,91,125,0,0,80,68,133,1,0,0,42,67,136,1,145,6,149,1,0,0,224,
67,153,1,0,0,0,0,0,32,131,64,160,1,217,3,34,10,8,4,16,6,24,4,32,3,
40,7,42,23,8,0,16,1,24,2,32,1,40,6,48,5,56,9,65,0,0,0,0,0,0,0,64,66,
84,8,0,16,8,24,4,32,10,40,2,48,1,56,6,64,3,72,1,80,0,88,3,96,213,
194,7,104,3,112,10,120,3,128,1,5,136,1,1,144,1,10,154,1,1,55,160,1,
4,168,1,3,176,1,0,184,1,9,192,1,3,200,1,5,208,1,0,216,1,5,224,1,149,
154,239,58,232,1,149,154,239,58,240,1,4,98,15,8,11,16,3,24,255,255,
3,32,255,1,40,1,48,3,98,12,8,111,16,4,24,40,32,60,40,0,48,2,98,12,8,
0,16,5,24,9,32,10,40,2,48,8,106,6,8,3,16,16,24,16,56,1>>}}, {fun emqx_resource_buffer_worker:handle_async_reply/2,
[#{buffer_worker => <0.15043.5198>,inflight_tid => undefined,
min_query =>
{query,{fun emqx_rule_runtime:inc_action_metrics/2,
[<<“TSP_SCHEDULE_DATA_WITH_KEY”>>],
#{reply_dropped => true}},
,false,infinity},
query_opts =>
#{connector_resource_id =>
<<“connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>,
expire_at => infinity,query_mode => simple_async_internal_buffer,
reply_to =>
{fun emqx_rule_runtime:inc_action_metrics/2,
[<<“TSP_SCHEDULE_DATA_WITH_KEY”>>],
#{reply_dropped => true}},
simple_query => true,timeout => infinity},
request_ref => -566725846151192728,
resource_id =>
<<“action:kafka_producer:TSP_SCHEDULE_KAFKA_WITH_KEY:connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>,
worker_index => undefined}]}, #{client_id=><<“connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>, installed_bridge_v2s=>#{<<“action:kafka_producer:TSP_SCHEDULE_KAFKA_WITH_KEY:connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>=>#{connector_resource_id=><<“connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>, ext_headers_tokens=>, headers_tokens=>undefined, headers_val_encode_mode=>none, kafka_client_id=><<“connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>, kafka_config=>#{buffer=>#{memory_overload_protection=>false, mode=>memory, per_partition_limit=>2147483648, segment_bytes=>104857600}, compression=>gzip, kafka_ext_headers=>, kafka_header_value_encode_mode=>none, max_batch_bytes=>917504, max_inflight=>10, message=>#{key=>“${.kafka_key}”, timestamp=>“${.timestamp}”, value=>“${payload}”}, partition_count_refresh_interval=>60, partition_strategy=>key_dispatch, query_mode=>async, required_acks=>leader_only, sync_query_timeout=>5000, topic=>“UpstreamScheduleStatus”}, kafka_topic=>“UpstreamScheduleStatus”, message_template=>#{key=>[{var,[<<“kafka_key”>>]}], timestamp=>[{var,[<<“timestamp”>>]}], value=>[{var,[<<“payload”>>]}]}, producers=>#{client_id=><<“connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>, partitioner=>first_key_dispatch, topic=>“UpstreamScheduleStatus”, workers=>kafka_producer_TSP_SCHEDULE_KAFKA_WITH_KEY}, resource_id=><<“action:kafka_producer:TSP_SCHEDULE_KAFKA_WITH_KEY:connector:kafka_producer:connector_TSP_SCHEDULE_KAFKA_WITH_KEY”>>, sync_query_timeout=>5000}}})
Recon tracer rate limit tripped.
e5.3.1(emqx@10.42.155.114)2>
我的使用 版本信息:5.3.1
你的 substr 函数用错了。
34 并不是长度,是 起点。
改成这样就好了:
substr(topic,0,34)
意思是取topic 的前 34 位。
同时我也看现你 From topic 刚好是 34 位。用不用 substr 都一样。。。。
请教一个问题,企业版EMQX 在桥接Kafka的时候,random模式是按照topic 分区轮询 ,还是随机发到分区里面 的
如果是随机发到分区里面,能不能配置按照分区轮询呢?
目前看量大的时候,random会导致某些分区数据量很大