我在使用内存sink来构建规则管道,但是我遇到了一个问题。
首先创建了一个httppull stream 用来获取数据,然后创建规则并将查询到的数据写入到内存sink中。
以下是这个规则的配置信息,我写入信息到topic1中,使用action字段控制是否更新,key设置为name
"rule_status_stg": "{\"triggered\":false,\"id\":\"rule_status_stg\",\"name\":\"rule_status_stg\",\"sql\":\"SELECT * , concat('update') as action, now() as ts from rule_stauts_stg;\",\"actions\":[{\"memory\":{\"bufferLength\":1024,\"dataTemplate\":\"{{toJson .}}\",\"enableCache\":false,\"format\":\"json\",\"keyField\":\"name\",\"omitIfEmpty\":false,\"rowkindField\":\"action\",\"runAsync\":false,\"sendSingle\":true,\"topic\":\"topic1\"}}],\"options\":{\"debug\":false,\"logFilename\":\"\",\"isEventTime\":false,\"lateTolerance\":1000,\"concurrency\":1,\"bufferLength\":1024,\"sendMetaToSink\":false,\"sendError\":true,\"qos\":0,\"checkpointInterval\":300000,\"restartStrategy\":{\"attempts\":0,\"delay\":1000,\"multiplier\":2,\"maxDelay\":30000,\"jitter\":0.1},\"cron\":\"\",\"duration\":\"\",\"cronDatetimeRange\":null}}"
然后我创建了一个查询表table_rulestatus
以下是查询表的配置信息:
"table_rulestatus": "\n CREATE TABLE table_rulestatus\n (\n id string, name string, ts datetime, status string\n )\n WITH (DATASOURCE=\"topic1\", FORMAT=\"json\", CONF_KEY=\"aa\", TYPE=\"memory\", KIND=\"lookup\", KEY=\"name\", );\n "
根据查询表,创建了规则rule00011,这个规则只是简单的读取查询表,并通过log打印
"rule00011": "{\"triggered\":false,\"id\":\"rule00011\",\"sql\":\"select *, id , name from table_rulestatus;\",\"actions\":[{\"log\":{\"bufferLength\":1024,\"enableCache\":false,\"format\":\"json\",\"omitIfEmpty\":false,\"runAsync\":false,\"sendSingle\":true}}],\"options\":{\"debug\":false,\"logFilename\":\"\",\"isEventTime\":false,\"lateTolerance\":1000,\"concurrency\":1,\"bufferLength\":1024,\"sendMetaToSink\":false,\"sendError\":true,\"qos\":0,\"checkpointInterval\":300000,\"restartStrategy\":{\"attempts\":0,\"delay\":1000,\"multiplier\":2,\"maxDelay\":30000,\"jitter\":0.1},\"cron\":\"\",\"duration\":\"\",\"cronDatetimeRange\":null}}"
我期望能从日志中打印中内存源中的数据,但遗憾的是没有。我打开了debug模式,从日志中可以看到内存sink已经接收到数据:
我是否有配置错误的地方?