portable插件的运行问题

在lfedge/ekuiper:1.13.0-alpha.1-dev的容器内部编译一个portable插件, 在这个容器下是可以运行的

但是在lfedge/ekuiper:1.13.0-alpha.1下无法运行, 提示(no such file or directory)

是python的插件吗,需要在 xxx-slim-python 的容器下运行。1.13这个版本这个容器ci有点问题,没打出来。可以用旧的版本看看。

你好,是go的插件

找不到文件?两个容器的目录结构不同,是不是放错位置了

没,我换成lfedge/ekuiper:1.13.0-alpha.1-dev可以跑这个插件,我使用eKuiper manager进行上传的,位置应该一样的

顺便再请教一下, 我看日志


里面和in和out都35次了,但是我插件里面的代码日志没有显示,请问可能是什么问题,会Ipc长度限制吗

你的问题信息太少了,步骤,出错信息,日志什么的。能拿到的都放上来吧。另外 parquet 格式我们也希望能做进产品里 https://github.com/lf-edge/ekuiper/issues/2521 可以考虑直接在产品里做,不用插件

Serving kuiper (version - 1.13.0-alpha.1) on port 20498, and restful api on http://0.0.0.0:9081.
time="2024-01-22T09:27:44Z" level=error msg="get rule status error: Rule hw002 is not found" file="server/rest.go:82"
time="2024-01-22T09:28:35Z" level=info msg="Stream hw002 is created." file="processor/stream.go:81"
time="2024-01-22T09:28:35Z" level=info msg="Rule hw002 is created." file="processor/rule.go:62"
time="2024-01-22T09:28:35Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 Restart:0xc000481740 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:44"
time="2024-01-22T09:28:35Z" level=info msg="Start rulestate hw002" file="rule/ruleState.go:154"
time="2024-01-22T09:28:35Z" level=info msg="Load custom schema from file a10, for symbol GetConverter" file="schema/ext_inferer_custom.go:36"
time="2024-01-22T09:28:35Z" level=error msg="Create rule topo error: schema type custom, file a10 not found" file="server/rule_manager.go:141"
time="2024-01-22T09:28:35Z" level=error msg="Rule hw002 was stopped." file="server/rest.go:755"
time="2024-01-22T09:28:59Z" level=info msg="Connect MQTT broker tcp://192.168.1.50:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-01-22T09:28:59Z" level=info msg="The connection to mqtt broker is established successfully for tcp://192.168.1.50:1883." file="mqtt/mqtt.go:121"
time="2024-01-22T09:28:59Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:79"
time="2024-01-22T09:28:59Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-01-22T09:28:59Z" level=info msg="The connection to mqtt broker tcp://192.168.1.50:1883 client id ac0fa440-b908-11ee-9d96-0242ac110008 established" file="mqtt/mqtt_wrapper.go:86"
time="2024-01-22T09:28:59Z" level=info msg="Mqtt Source instance 0 Done" file="mqtt/mqtt_source.go:199" rule=TestSourceOpen_mqtt
time="2024-01-22T09:28:59Z" level=error msg="not found subscription id __0" file="mqtt/mqtt_wrapper.go:244" rule=TestSourceOpen_mqtt
time="2024-01-22T09:28:59Z" level=info msg="mqtt client wrapper reference count 0" file="mqtt/mqtt_wrapper.go:292" rule=TestSourceOpen_mqtt
time="2024-01-22T09:28:59Z" level=info msg="Closing the connection to mqtt broker for tcp://192.168.1.50:1883" file="mqtt/mqtt.go:154"
time="2024-01-22T09:29:14Z" level=info msg="Start to download file file:///go/kuiper/_build/kuiper-1.13.0-alpha.1-linux-amd64/data/uploads/converter-a10-amd64.so\n" file="httpx/http.go:158"
time="2024-01-22T09:29:25Z" level=info msg="Start to download file file:///go/kuiper/_build/kuiper-1.13.0-alpha.1-linux-amd64/data/uploads/znb.zip\n" file="httpx/http.go:158"
time="2024-01-22T09:29:25Z" level=info msg="Loading yaml file for sink: znb_parquet" file="meta/yamlConfigMeta.go:86"
time="2024-01-22T09:29:25Z" level=info msg="Loading metadata file for sink: znb_parquet.json" file="meta/sinkMeta.go:263"
time="2024-01-22T09:29:25Z" level=info msg="Installed portable plugin znb successfully" file="portable/manager.go:163"
time="2024-01-22T09:29:38Z" level=info msg="funcMeta file : internal.json" file="meta/func_meta.go:70"
time="2024-01-22T09:29:44Z" level=info msg="Stop rulestate hw002" file="rule/ruleState.go:158"
time="2024-01-22T09:29:45Z" level=info msg="funcMeta file : internal.json" file="meta/func_meta.go:70"
time="2024-01-22T09:30:15Z" level=info msg="Rule a is created." file="processor/rule.go:73"
time="2024-01-22T09:30:15Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 Restart:0xc0000a8ed0 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:44"
time="2024-01-22T09:30:15Z" level=info msg="Start rulestate a" file="rule/ruleState.go:154"
time="2024-01-22T09:30:15Z" level=info msg="Load custom schema from file a10, for symbol GetConverter" file="schema/ext_inferer_custom.go:36"
time="2024-01-22T09:30:15Z" level=info msg="Init rule with options &{Debug:false LogFilename: IsEventTime:false LateTol:1000 Concurrency:1 BufferLength:1024 SendMetaToSink:false SendError:true Qos:0 CheckpointInterval:300000 Restart:0xc0000a8ed0 Cron: Duration: CronDatetimeRange:[]}" file="planner/planner.go:44"
time="2024-01-22T09:30:15Z" level=info msg="Load custom schema from file a10, for symbol GetConverter" file="schema/ext_inferer_custom.go:36"
time="2024-01-22T09:30:15Z" level=info msg="Opening stream" file="topo/topo.go:183" rule=a
time="2024-01-22T09:30:15Z" level=info msg="Start with window state triggerTime: 1705915815255, msgCount: 0" file="node/window_op.go:162" rule=a
time="2024-01-22T09:30:15Z" level=info msg="open source node hw002 with option &{hw002  custom hw002 mqtt false   false a10.Converter 0    map[] false false }" file="node/source_node.go:83" rule=a
time="2024-01-22T09:30:15Z" level=info msg="Set config 'mqtt_source.default.server' to 'tcp://192.168.1.50:1883' by environment variable" file="conf/load.go:130"
time="2024-01-22T09:30:15Z" level=info msg="Load custom converter from file a10, for symbol GetConverter" file="custom/converter.go:32"
time="2024-01-22T09:30:15Z" level=info msg="open source node with props map[delimiter: format:custom insecureSkipVerify:false key: password:*** protocolVersion:3.1.1 qos:1 server:tcp://192.168.1.50:1883 username:admin], concurrency: 1, bufferLength: 102400" file="node/source_node.go:128" rule=a
time="2024-01-22T09:30:15Z" level=info msg="Connect MQTT broker tcp://192.168.1.50:1883 with TLS configs" file="mqtt/mqtt.go:83"
time="2024-01-22T09:30:15Z" level=info msg="align window timer to 2024-01-22 09:31:00 +0000 UTC(1705915860000)" file="node/window_op.go:223" rule=a
time="2024-01-22T09:30:15Z" level=info msg="open sink node 1 instances with batchSize%!(EXTRA int=0)" file="node/sink_node.go:140" rule=a
time="2024-01-22T09:30:15Z" level=info msg="Start running portable sink znb_parquet with conf map[bufferLength:1024 dir:/data enableCache:false format:json omitIfEmpty:false runAsync:false sendSingle:false]" file="runtime/sink.go:46" rule=a
time="2024-01-22T09:30:15Z" level=info msg="create control channel" file="runtime/plugin_ins_manager.go:233"
time="2024-01-22T09:30:15Z" level=info msg="start to listen at ipc:///tmp/plugin_znb.ipc after 0 tries" file="runtime/connection.go:267"
time="2024-01-22T09:30:15Z" level=info msg="control channel created: ipc:///tmp/plugin_znb.ipc" file="runtime/connection.go:246"
time="2024-01-22T09:30:15Z" level=info msg="executing plugin" file="runtime/plugin_ins_manager.go:241"
time="2024-01-22T09:30:15Z" level=info msg="starting go plugin executable /go/kuiper/_build/kuiper-1.13.0-alpha.1-linux-amd64/plugins/portable/znb/znb" file="runtime/plugin_ins_manager.go:250"
time="2024-01-22T09:30:15Z" level=info msg="plugin starting" file="runtime/plugin_ins_manager.go:278"
time="2024-01-22T09:30:15Z" level=info msg="The connection to mqtt broker is established successfully for tcp://192.168.1.50:1883." file="mqtt/mqtt.go:121"
time="2024-01-22T09:30:15Z" level=info msg="The connection to mqtt broker tcp://192.168.1.50:1883 client id d95b1131-b908-11ee-9d96-0242ac110008 established" file="mqtt/mqtt_wrapper.go:86"
time="2024-01-22T09:30:15Z" level=info msg="new mqtt client created" file="mqtt/mqtt_wrapper.go:79"
time="2024-01-22T09:30:15Z" level=info msg="Init client wrapper for client type mqtt" file="clients/client_registy.go:107"
time="2024-01-22T09:30:15Z" level=info msg="Start source hw002 instance 0 successfully" file="node/source_node.go:161" rule=a
time="2024-01-22T09:30:15Z" level=info msg="new subscription for topic hw002, reqId is a_hw002_0" file="mqtt/mqtt_wrapper.go:223" rule=a
time="2024-01-22T09:30:15Z" level=info msg="Successfully subscribed to topic hw002." file="mqtt/mqtt_source.go:125" rule=a
time="2024-01-22T09:30:15Z" level=info msg="plugin started pid: 15\n" file="runtime/plugin_ins_manager.go:284"
time="2024-01-22T09:30:15Z" level=info msg="waiting handshake" file="runtime/plugin_ins_manager.go:310"
2024/01/22 09:30:15 启动[znb]插件, 参数: [/go/kuiper/_build/kuiper-1.13.0-alpha.1-linux-amd64/plugins/portable/znb/znb {"sendTimeout":1000}]
time="2024-01-22 09:30:15" level=info msg="config parsed to &{1000}" file="runtime/plugin.go:50" plugin=znb
time="2024-01-22 09:30:15" level=info msg="starting plugin" file="runtime/plugin.go:90" plugin=znb
time="2024-01-22 09:30:15" level=error msg="can't set socket option RETRY-TIME: invalid option value" file="connection/connection.go:173"
time="2024-01-22 09:30:15" level=info msg="running control channel" file="runtime/plugin.go:97" plugin=znb
time="2024-01-22T09:30:15Z" level=info msg="plugin start running" file="runtime/plugin_ins_manager.go:317"
time="2024-01-22T09:30:15Z" level=info msg="restore plugin symbols" file="runtime/plugin_ins_manager.go:319"
time="2024-01-22T09:30:15Z" level=info msg="Plugin started successfully" file="runtime/sink.go:52" rule=a
time="2024-01-22 09:30:15" level=info msg="received command start with arg:'{\"symbolName\":\"znb_parquet\",\"meta\":{\"ruleId\":\"a\",\"opId\":\"znb_parquet_0\",\"instanceId\":0},\"pluginType\":\"sink\",\"config\":{\"bufferLength\":1024,\"dir\":\"/data\",\"enableCache\":false,\"format\":\"json\",\"omitIfEmpty\":false,\"runAsync\":false,\"sendSingle\":false}}'" file="runtime/plugin.go:104" plugin=znb
time="2024-01-22 09:30:15" level=info msg="plugin start to listen after 300 tries" file="connection/connection.go:186"
time="2024-01-22 09:30:15" level=info msg="Setup message pipeline, start listening" file="runtime/sink.go:48" rule=a
time="2024-01-22 09:30:15" level=info msg="running sink znb_parquet" file="runtime/plugin.go:136" plugin=znb
time="2024-01-22T09:30:15Z" level=info msg="started symbol znb_parquet" file="runtime/plugin_ins_manager.go:104" rule=a
time="2024-01-22T09:30:15Z" level=info msg="sink channel created: ipc:///tmp/a_znb_parquet_0_0.ipc" file="runtime/connection.go:224"
time="2024-01-22T09:30:15Z" level=info msg="sink node znb_parquet_0 instance 0 starts with conf {Concurrency:1 Omitempty:false SendSingle:false DataTemplate: Format:json SchemaId: Delimiter: BufferLength:1024 Fields:[] DataField: BatchSize:0 LingerInterval:0 SinkConf:{MemoryCacheThreshold:1024 MaxDiskCache:1024000 BufferPageSize:256 EnableCache:false ResendInterval:0 CleanCacheAtStop:false ResendAlterQueue:false ResendPriority:0 ResendIndicatorField:}}" file="node/sink_node.go:187" rule=a
time="2024-01-22T09:31:00Z" level=info msg="First tick at 2024-01-22 09:31:00.000773669 +0000 UTC m=+202.968902321(1705915860000), defined at 1705915860000" file="node/window_op.go:417" rule=a

我弄个新环境,配置了下,里面有个error, msg=“can’t set socket option RETRY-TIME: invalid option value” file=“connection/connection.go:173”

流,从mqtt读取数据, 自定义一个解码


规则,1分钟聚合数据,写入到parquet


parquet插件相关代码

logrus 是写到文件吗?log直接写入控制台呢?

写到控制台的


启动的时候可以打印

我尝试用少量数据作为源,可以正常进入到插件,请问一下,数据量会导致这个问题吗

好像是数据量的问题,我把窗口从60s改成1s,可以正常触发了

1分钟窗口的数据大概有50mb左右