架构大家好,我是老杨,今天我们来学习一下FlinkSQL中的Overaggregationoperation。过聚合定义(支持Batch\Streaming):可以理解为一种特殊的滑动窗口聚合函数。这里我们比较一下OverAggregation和WindowAggregation。它们最大的区别是:WindowAggregation:select中不能直接获取到不属于groupby的字段OverAggregation:可以保留原有字段注意:其实在生产环境中,Over聚合的使用场景比较少.Hive中也有同样的聚合,但是猛哥,你能想一下你在离线数仓中的使用频率是多少吗?应用场景:计算最新滑动窗口的聚合结果数据。实际案例:查询最近一小时各商品的订单金额总和:SELECTorder_id,order_time,amount,SUM(amount)OVER(PARTITIONBYproductORDERBYorder_timeRANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)ASone_hour_prod_amount_sumFROMOrdersOveraggregation语法总结如下:SELECTagg_func(agg_col)OVER([PARTITIONBYcol1[,col2,...]]ORDERBYtime_colrange_definition),...FROM...where:ORDERBY:must是一个时间戳列(eventtime,processingtime)PARTITIONBY:标识聚合窗口的聚合粒度,比如上面的案例是基于产品聚合的range_definition:这个标识聚合窗口的聚合数据范围,有两种方式在Flink中指定数据范围。第一种是按行数聚合,第二种是按时间间隔聚合。下面的例子说明:a.按时间间隔聚合:按时间间隔聚合是时间间隔的滑动窗口。比如下面案例中的1小时区间,最新输出数据的sum聚合结果为最近一小时的数据量之和。CREATETABLEsource_table(order_idBIGINT,productBIGINT,amountBIGINT,order_timeascast(CURRENT_TIMESTAMPasTIMESTAMP(3)),WATERMARKFORorder_timeASorder_time-INTERVAL'0.001'SECOND)WITH('connector'='datagen','行-per-second'='1','fields.order_id.min'='1','fields.order_id.max'='2','fields.amount.min'='1','fields.amount.max'='10','fields.product.min'='1','fields.product.max'='2');CREATETABLEsink_table(productBIGINT,order_timeTIMESTAMP(3),amountBIGINT,one_hour_prod_amount_sumBIGINT)WITH('connector'='print');INSERTINTOsink_tableSELECTproduct,order_time,amount,SUM(amount)OVER(PARTITIONBYproductORDERBYorder_time--标记统计范围是一个产品的最近1小时的数据RANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)ASone_hour_prod_amount_sumFROMsource_table结果如下:+I[2,2021-12-24T22:08:26.583,7,73]+I[2,2021-12-24T22:08:27.583,7,80]+我[2,2021-12-24T22:08:28.583,4,84]+I[2,2021-12-24T22:08:29.584,7,91]+I[2,2021-12-24T22:08:30.583,8,99]+I[1,2021-12-24T22:08:31.583,9,138]+I[2,2021-12-24T22:08:32.584,6,105]+I[1,2021-12-24T22:08:33.584,7,145]b.行数聚合:按照行数聚合,是数据行数的滑动窗口。比如下面的例子,最新输出数据的sum聚合结果就是最新5行数据量的总和CREATETABLEsource_table(order_idBIGINT,productBIGINT,amountBIGINT,order_timeascast(CURRENT_TIMESTAMPasTIMESTAMP(3)),WATERMARKFORorder_timeASorder_time-INTERVAL'0.001'SECOND)WITH('connector'='datagen','行-per-second'='1','fields.order_id.min'='1','fields.order_id.max'='2','fields.amount.min'='1','fields.amount.max'='2','fields.product.min'='1','fields.product.max'='2');CREATETABLEsink_table(productBIGINT,order_timeTIMESTAMP(3),amountBIGINT,one_hour_prod_amount_sumBIGINT)WITH('connector'='print');INSERTINTOsink_tableSELECTproduct,order_time,amount,SUM(amount)OVER(PARTITIONBYproductORDERBYorder_time--标记统计范围是一个产品的最近5行数据ROWSBETWEEN5PRECEDINGANDCURRENTROW)ASone_hour_prod_amount_sumFROMsource_table预测结果如下:+I[2,2021-12-24T22:18:19.147,1,9]+I[1,2021-12-24T22:18:20.147,2,11]+我[1,2021-12-24T22:18:21.147,2,12]+I[1,2021-12-24T22:18:22.147,2,12]+I[1,2021-12-24T22:18:23.148,2,12]+I[1,2021-12-24T22:18:24.147,1,11]+??I[1,2021-12-24T22:18:25.146,1,10]+I[1,2021-12-24T22:18:26.147,1,9]+I[2,2021-12-24T22:18:27.145,2,11]+??I[2,2021-12-24T22:18:28.148,1,10]+I[2,2021-12-24T22:18:29.145,2,10]当然,如果你在一个SELECT中有多个聚合窗口,FlinkSQL支持简化的写法,如下例:SELECTorder_id,order_time,amount,SUM(amount)OVERwASsum_amount,AVG(amount)OVERwASavg_amountFROMOrders--使用以下子句定义OverWindowWINDOWwAS(PARTITIONBYproductORDERBYorder_timeRANGEBETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW)
