我用kafka做为source,emqx接收kafka中的消息,然后打印到控制台上,但是emqx无法接收到kafka中的消息,报错如下:
2024-12-27T16:41:46.288788+08:00 [error] crasher: initial call: brod_group_coordinator:init/1, pid: <0.17466.0>, registered_name: , exit: {max_rejoin_attempts,[{gen_server,handle_common_reply,8,[{file,“gen_server.erl”},{line,1226}]},{proc_lib,init_p_do_apply,3,[{file,“proc_lib.erl”},{line,241}]}]}, ancestors: [<0.17465.0>,emqx_bridge_kafka_consumer_sup,emqx_bridge_sup,<0.4656.0>], message_queue_len: 0, messages: , links: [<0.17465.0>,<0.17468.0>], dictionary: [{rand_seed,{#{type => exsss,next => #Fun<rand.0.65977474>,bits => 58,uniform => #Fun<rand.1.65977474>,uniform_n => #Fun<rand.2.65977474>,jump => #Fun<rand.3.65977474>},[141855798526175138|186095540038266372]}}], trap_exit: true, status: running, heap_size: 4185, stack_size: 28, reductions: 7374; neighbours: neighbour: pid: <0.17468.0>, registered_name: , initial call: kpro_connection:init/4, current_function: {kpro_connection,loop,2}, ancestors: [<0.17466.0>,<0.17465.0>,emqx_bridge_kafka_consumer_sup,emqx_bridge_sup,<0.4656.0>], message_queue_len: 0, links: [<0.17466.0>,#Port<0.910>], trap_exit: false, status: waiting, heap_size: 2586, stack_size: 7, reductions: 5807, current_stacktrace: [{kpro_connection,loop,2,[{file,“kpro_connection.erl”},{line,388}]},{proc_lib,init_p_do_apply,3,[{file,“proc_lib.erl”},{line,241}]}]
2024-12-27T16:41:46.289206+08:00 [error] Supervisor: {local,emqx_bridge_kafka_consumer_sup}. Context: child_terminated. Reason: {shutdown,coordinator_failure}. Offender: id=<<107,97,102,107,97,95,115,117,98,115,99,114,105,98,101,114,58,107,97,102,107,97,45,115,111,117,114,99,101>>,pid=<0.17465.0>.
2024-12-27T16:41:50.306686+08:00 [error] Generic server <0.17475.0> terminating. Reason: max_rejoin_attempts. Last message: {lo_cmd_stabilize,5,not_coordinator}. State: {state,‘kafka_consumer:kafka-conn:emqx@127.0.0.1’,<<“emqx-kafka-consumer-kafka-source”>>,<<>>,undefined,0,[<<“test_mqtt_topic”>>],<0.17477.0>,undefined,,false,<0.17474.0>,brod_group_subscriber_v2,,#Ref<0.1040234151.443023361.18213>,roundrobin_v2,30,30,5,5,1,undefined,commit_to_kafka_v2,5,roundrobin_v2}.
这不是有原因么
不知道怎么改啊,是改kafka配置,还是改emqx的配置。
请大神指点。
我也不是很了解 kafka
不过问了一下 AI:
coordinator_failure 通常与 Kafka 消费者组协调器相关,这个错误可能有以下几个主要原因:
- 连接问题:
- Kafka broker 无法正常响应协调器请求
- 网络连接不稳定或防火墙阻挡
- broker 地址配置错误
- 消费者组配置问题:
- group.id 配置冲突或无效
- 消费者组的 session 超时
- 消费者组的协调器选举失败
- 权限问题:
- 消费者组没有足够的权限加入/创建组
- 没有读取主题的权限
你可以看看 kafka 的日志有什么错,然后网上找找原因。