按空间(楼层)统计平均温度,每房间温度由空调上报实时温度,请教如何去重上报数据

需求描述:实时统计出每一楼层的空调平均温度。
每个房间的有1或者多台空调,会实时上报温度等数据到mqtt。
原始上报数据不含房间空间标识,收到后处理再次发到mqtt中,数据结构大致这样:{"ts":1,"device":"dev01","temperature":25,"floor":"10","room":"40"}
device用于标识设备, floorroom标识空间,floor代表楼层,room代表该层的房间编号
需要统计出楼层的平均温度,SQL如下 :
select floor, avg(temperature) from devStream group by floor TUMBLINGWINDOW(mi, 1)
问题是如何把这个统计窗口内同一设备重复上报的数据给过滤掉?

类似的场景还有:实时统计每层楼空调运行模式(制冷、制热)的数量等
select floor, mode,count(mode) as mode_count from devStream group by floor,mode TUMBLINGWINDOW(mi, 1)

我认为可以用一个规则做预处理去重,然后发回 MQTT 或者用规则流水线再接统计规则。去重的逻辑。去重的逻辑是怎么样的?

重复的逻辑是时间窗口内一台设备的同一指标上报了多次数据,应以最新为准。

上图中①处产生原始数据,②③发布给业务侧对每条增加空间信息,④由eKuiper进行SQL处理,想利用eKuiper SQL的边缘流处理能力进行对数据统计聚合(聚合前先去重)。

产生重复的原因是设备在窗口期内上报了多次温度等状态数据,我们想取到窗口内最新的一条,看了last_value、merge_agg、deduplicate函数都是对非聚合查询中存在的字段才生效,

例如非聚合查询可以取最新的温度: select dev, floor, last_value(temperature) as last_temp from devStream TUMBLINGWINDOW(mi, 1)

聚合查询则不知如何使用last_value函数: select floor, avg(temperature) as avg_temp from devStream group by floor TUMBLINGWINDOW(mi, 1)

考虑了下面方案:

方案一:如果select支持嵌套查询应该也可以:
select floor avg(last_temp) as avg_temp from (select dev,floor, last_value(temperature) as last_temp from devStream TUMBLINGWINDOW(mi, 1) t group t.floor TUMBLINGWINDOW(mi, 1)
元旦期间测过不生效

方案二:上图中在④处理之前进行去重,通过你提到的规则引擎或是什么方式,目前没想好。

但要求整个处理过程不能太耗时,需要实时/准实时响应,时间窗口要求 TUMBLINGWINDOW(ss, 10)

嵌套查询可用上文提到的 规则流水线方式实现

感谢,这两天看了memory类型的sink的使用,也没有解决,从头梳理下需求场景再定方案