当前位置: 首页 > 科技观察

FlinkSQL知其然:Group聚合操作

时间:2023-03-13 23:55:46 科技观察

Group聚合Group聚合定义(支持Batch\Streaming任务):Flink也支持Group聚合。Group聚合与上面介绍的window聚合的区别在于,Group聚合是根据数据的类别,如年龄、性别等进行分组,是横向的;而窗口聚合则是按照时间的粒度对数据进行分组,是垂直的。下图显示了差异。其中,按颜色(横向)划分key是Group聚合,按window(纵向)划分是window聚合。Tumblewindow+key应用场景:一般用于对数据进行分组,然后使用聚合函数进行计数、求和等聚合操作。那么这时候萌友就会问了,我其实可以把window聚合的写法转换成Group聚合,只是把Group聚合的GroupBykey改成time,那么这两种聚合有什么区别呢??首先我们举个例子,看看如何将window聚合转化为Group聚合。如果以1分钟为粒度进行窗口聚合,则SQL如下:--datasourcetableCREATETABLEsource_table(--dimensiondatadimSTRING,--useriduser_idBIGINT,--userpriceBIGINT,--事件时间戳row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),--水印设置WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND)WITH('connector'='datagen','rows-per-second'='10','fields.dim.length'='1','fields.user_id.min'='1','fields.user_id.max'='100000','fields.price.min'='1','fields.price.max'='100000')--数据表CREATETABLEsink_table(dimSTRING,pvBIGINT,sum_priceBIGINT,max_priceBIGINT,min_priceBIGINT,uvBIGINT,window_startbigint)WITH('connector'='print')--数据处理逻辑插入sink_tableselectdim,count(*)aspv,sum(price)assum_price,max(price)asmax_price,min(price)asmin_price,--计算uv数count(distinctuser_id)asuv,UNIX_TIMESTAMP(CAST(tumble_start(row_time,间隔“1”分钟)ASSTRING))*1000aswindow_startfromsource_tablegroupbydim,--按照FlinkSQLtumblewindow的写法划分窗口。将tumble(row_time,interval'1'minute)转化为组聚合如下:,--事件时间戳row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),--水印设置WATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND)WITH('connector'='datagen','rows-per-second'='10','fields.dim.length'='1','fields.user_id.min'='1','fields.user_id.max'='100000','fields.price.min'='1','fields.price.max'='100000');--数据表CREATETABLEsink_table(dimSTRING,pvBIGINT,sum_priceBIGINT,max_priceBIGINT,min_priceBIGINT,uvBIGINT,window_startbigint)WITH('connector'='print');--数据处理逻辑插入sink_tableselectdim,count(*)aspv,sum(price)assum_price,max(price)asmax_price,min(price)asmin_price,--计算uvnumbercount(distinctuser_id)asuv,cast((UNIX_TIMESTAMP(CAST(row_timeASSTRING)))/60asbigint)aswindow_startfromsource_tablegroupbydim,--转换二级timeStamp/60转换为1mincast((UNIX_TIMESTAMP(CAST(row_timeASSTRING)))/60asbigint)确实是正确的,上面的转换完全没有问题,但是window聚合和Groupby聚合的区别就在于in:本质区别:窗口聚合具有时间语义。其本质是实现窗口输出结束后,后续的迟到数据不会改变原来的结果,即输出值为固定值(不考虑allowLateness)。Groupby聚合没有时间语义。无论多晚的数据,只要数据到来,就会撤回最后输出的结果数据,然后将计算出的新结果数据发送出去。操作层面:窗口聚合与时间绑定,窗口聚合的计算结果由时间(Watermark)触发。Groupbyaggregation完全由数据驱动来触发计算,根据这条数据计算出一条新的数据,并将结果发送出去;可见两者的实现方式也是大相径庭。SQL语义也与离线和实时进行了比较。Orders是Kafka,target_table是Kafka。这个SQL产生的实时任务在执行时会产生三个算子:数据源算子(FromOrder):数据源算子一直运行,实时从OrderKafka中一个一个读取数据,然后一个一个发送到下游组聚合运算符。向下游发送数据的shuffle策略是根据groupby中的key发送数据,相同的key发送到同一个SubTask中(并发)。Groupaggregationoperator(groupbykey+sum\count\max\min):从upstreamoperator一个接一个的收到数据后,去state找key之前的sum\count\max\min结果。若有结果oldResult,取出与当前数据sum\count\max\min计算出该key的新结果newResult,并将新结果[key,newResult]更新为状态,发送它向下游发送新的计算结果前,发送消息撤回上次的结果-[key,oldResult],然后向下游发送新的结果+[key,newResult];如果state中没有当前key的结果,则直接使用当前数据计算sum\max\min结果newResult,并将新结果[key,newResult]更新到state中。由于是第一次向下游发送,所以不需要先发送提现消息,直接发送+[key,newResult]即可。数据采集??算子(INSERTINTOtarget_table):从上游一个接一个地接收数据,写入target_tableKafka。这个实时任务也是24小时不间断运行,所有操作员同时处于运行状态。特别注意:Groupby聚合涉及一个retractionflow(也叫retractflow),会产生一个retractionflow,因为从整个SQL的语义来看,上游的Kafk数据是无穷无尽的,所以每次这个产生的结果SQL任务是一个中间结果,所以每次更新结果,都需要撤回上次发出的中间结果,然后发送最新的结果。Groupby聚合涉及状态:状态大小还取决于不同键的数量。为了防止状态无限增长,我们可以设置状态的TTL。以上面的SQL为例。上面的SQL是按分钟聚合的。理论上,今天,我们通常不需要关心昨天的数据,所以我们可以将状态过期时间设置为一天。状态过期时间的设置参数请参考下面的运行时参数部分。如果这条SQL在Hive中执行,其中Orders是Hive,target_table也是Hive,同样会生成三个相同的算子,但是和实时任务的执行方式完全不同:数据源算子(FromOrder):数据源算子从OrderHive中读取所有数据,然后将所有数据发送给下游的Group聚合算子。向下游发送数据的shuffle策略是根据groupby中的key发送数据。同一个key发送给同一个operator,然后这个operator的操作结束,资源被释放。分组聚合算子(groupby+sum\count\max\min):接收上游算子发送的所有数据,然后遍历计算sum\count\max\min结果,发送给下游datasink算子分批次,这个算子的操作结束,资源被释放。数据采集??算子(INSERTINTOtarget_table):从上游接收到一条数据写入target_tableHive后,整个任务结束,释放整个任务的资源。Group聚合支持Groupingsets、Rollup、CubeGroup聚合也支持Groupingsets、Rollup、Cube。举一个分组集的例子:SELECTsupplier_id,rating,product_id,COUNT(*)FROM(VALUES('supplier1','product1',4),('supplier1','product2',3),('supplier2','product3',3),('supplier2','product4',4))ASProducts(supplier_id,product_id,rating)GROUPBYGROUPINGSET((supplier_id,product_id,rating),(supplier_id,product_id),(supplier_id,rating),(supplier_id),(product_id,rating),(product_id),(rating),())?