当前位置: 首页 > 科技观察

FlinkSQL知道为什么:TopN、OrderBy、Limit操作

时间:2023-03-18 00:11:13 科技观察

DML:OrderBy、Limit子句大家好,我是老杨,今天我们来学习一下FlinkSQL中的TopN、OrderBy、Limit操作。1、OrderBy子句支持Batch\Streaming,但是一般在实时任务中很少用到。在实时任务中,OrderBy子句中必须有时间属性字段,时间属性必须是升序时间属性,即WATERMARKFORrowtime_columnASrowtime_column-INTERVAL'0.001'SECONDorWATERMARKFORrowtime_columnASrowtime_column.示例:CREATETABLEsource_table_1(user_idBIGINTNOTNULL,row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),WATERMARKFORrow_timeASrow_time)WITH('connector'='datagen','rows-per-second'='10','fields.user_id.min'='1','fields.user_id.max'='10');CREATETABLEsink_table(user_idBIGINT)WITH('connector'='print');INSERTINTOsink_tableSELECTuser_idFROMsource_table_1OrderBytherow_time,user_iddesc2.Limit子句支持Batch\Streaming,但是一般不会在实时场景下使用,这里还是举个例子:CREATETABLEsource_table_1(user_idBIGINTNOTNULL,row_timeAScast(CURRENT_TIMESTAMP作为时间戳(3)),WATERMARKFORrow_timeASrow_time)WITH('connector'='datagen','rows-per-second'='10','fields.user_id.min'='1','fields.user_id.max'='10');CREATETABLEsink_table(user_idBIGINT)WITH('connector'='print');INSERTINTOsink_tableSELECTuser_idFROMsource_table_1Limit3结果如下,只输出3:+I[5]+I[9]+I[4]DML:TopNclauseTopNdefinition(supportsBatch\Streaming):TopN其实对应的是离线数仓中的row_number(),可以通过row_number()对某一组的数据进行排序应用场景:根据一定的排序条件SQL计算某一组下的排行榜数据语法标准:SELECT[column_list]FROM(SELECT[column_list],ROW_NUMBER()OVER([PARTITIONBYcol1[,col2...]]ORDERBYcol1[asc|desc][,col2[asc|desc]...])ASrownumFROMtable_name)WHERErownum<=N[ANDconditions]ROW_NUMBER():识别TopN排序子句PARTITIONBYcol1[,col2...]:识别分区字段,表示数据按照col字段作为分区粒度排序取topN,比如partitionbykey在以下情况下,是根据要求中的搜索关键字(key)作为分区的ORDERBYcol1[asc|desc][,col2[asc|desc]...]:标识TopN的排序规则是按照哪些字段排序,顺序还是倒序WHERErownum<=N:这个子句是肯定需要的。只有有了这个子句,Flink才能将其识别为TopNquery,其中N代表TopN的条目数[AND条件]:实际案例中还可以加入其他限制:取某个搜索热度下的前10条条目数据搜索关键字输入数据是搜索词数据的搜索流行度数据。当搜索热度发生变化时,会将变化的数据写入到数据源的Kafka中:数据源schema:--字段名备注--关键搜索关键词--name搜索热度名称--search_cnt热搜消费热度(forexample,3000)--timestampconsumptionentrytimestampCREATETABLEsource_table(nameBIGINTNOTNULL,search_cntBIGINTNOTNULL,keyBIGINTNOTNULL,row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),WATERMARKFORrow_timeASrow_time)WITH(...);--datasinkschema:--keysearchkeywords--name搜索热度名称--search_cnt热搜消费热度(比如3000)--timestamp消费入口timestampCREATETABLEsink_table(keyBIGINT,nameBIGINT,search_cntBIGINT,`timestamp`TIMESTAMP(3))WITH(...);--DML逻辑INSERTINTOsink_tableSELECTkey,name,search_cnt,row_timeas`timestamp`FROM(SELECTkey,name,search_cnt,row_time,--根据热搜关键词key作为partitionkey,一个d然后根据search_cntROW_NUMBER()OVER(PARTITIONBYkeyORDERBYsearch_cntdesc)ASrownumFROMsource_table)WHERErownum<=100输出结果前100:-D[关键字1,条目1,4944]+I[关键字1,条目1,8670]+I[关键字1,条目2,1735]-D[关键字1,条目3,6641]+I[关键字1,entry3,6928]-D[keyword1,entry4,6312]+I[keyword1,entry4,7287]可以看到输出的数据是返回的如果数据是撤回的,为什么会有撤回?让我们看一下SQL语义。SQL语义上面的SQL会翻译成以下三个算子:数据源:数据源是最新词条下的搜索词的搜索热度数据,消费在Kafka中收到数据后,根据分区对数据进行哈希处理key分发给下游的排序算子。相同的key数据会被发送给一个并发排序算子:为每个key维护一个TopN列表数据,收到一条上游数据后,如果TopN列表还没有到N,则将这条数据添加到TopN列表中,然后直接发送数据。如果到了N,经过TopN计算后,发现这个数据比排在第一位的数据多,那么新的TopN排名就会发生变化,在变化的部分数据之前发布的排名数据会被撤回(即,撤回的数据),然后发布新的排名数据。Datasink:从上游接收到数据输出到外部存储引擎后,上述三个算子也会24小时不间断运行。

最新推荐
猜你喜欢