当前位置: 首页 > 科技观察

FlinkSQL知乎:Deduplication去重以及如何获取最新状态操作

时间:2023-03-17 15:36:30 科技观察

大家好,我是老杨。今天我们就来学习一下FlinkSQL去重,以及如何通过去重操作获取最新状态。去重定义(支持Batch\Streaming):去重其实就是去重,也就是上面介绍的TopN中row_number=1的场景,但是这里有个区别,排序字段必须是时间属性列,不能是其他非Normal时间属性列。当row_number=1时,如果排序字段是普通的columnplanner会被翻译成TopN算子,如果是时间属性的columnplanner会被翻译成Deduplication,两者最终执行的算子不同,Deduplication与TopN比较operator专门做了相应的优化,性能会有很大的提升。应用场景:比如上游数据发送量大,或者计算DAU明细数据等,都可以使用去重语法去重。SQL语法标准:SELECT[column_list]FROM(SELECT[column_list],ROW_NUMBER()OVER([PARTITIONBYcol1[,col2...]]ORDERBYtime_attr[asc|desc])ASrownumFROMtable_name)WHERErownum=1其中:ROW_NUMBER():标识当前数据的排序值。PARTITIONBYcol1[,col2...]:标识分区字段,即按照col字段为分区粒度对数据进行排序。ORDERBYtime_attr[asc|desc]:标识排序规则,必须是时间戳列。目前,FlinkSQL支持处理时间和事件时间。ASC表示保留第一行,DESC表示保留最后一行。WHERErownum=1:这个子句是必需的,必须是rownum=1。实际案例:这里给博主两个案例:案例1(事件时间):是腾讯QQ用户级别的场景,每个QQ用户都有一个QQ用户级别,需要查看当前用户级别是否在星星、月亮或太阳有多少用户。--数据来源:每个用户级别初始化后发生变化时的数据,即用户级别变化的明细数据。CREATETABLEsource_table(user_idBIGINTCOMMENT'userid',levelSTRINGCOMMENT'userlevel',row_timeAScast(CURRENT_TIMESTAMPastimestamp(3))COMMENT'eventtimestamp',WATERMARKFORrow_timeASrow_time)WITH('连接器'='datagen','rows-per-second'='1','fields.level.length'='1','fields.user_id.min'='1','fields.user_id.max'='1000000');--Datasink:输出的是每层的用户数CREATETABLEsink_table(levelSTRINGCOMMENT'level',uvBIGINTCOMMENT'当前层级的用户数',row_timetimestamp(3)COMMENT'timestamp')WITH('connector'='print');--处理逻辑:INSERTINTOsink_tableselectlevel,count(1)asuv,max(row_time)asrow_timefrom(SELECTuser_id,level,row_time,row_number()over(partitionbyuser_idorderbyrow_time)asrnFROMsource_table)wherern=1groupbyleveloutput:+I[level1,6928,2021-1-28T22:34]-I[level1,6928,2021-1-28T22:34]+I[1级,8670,2021-1-28T22:34]-I[1级,8670,2021-1-28T22:34]+I[Level1,77287,2021-1-28T22:34]...可以看到已经收回了数据,对应的SQL语义如下:数据源:在Kafka消费数据后,根据topartitionbykey通过散列分布策略发送给下游的去重算子。Deduplication去重算子:接收到上游数据后,根据orderby中的条件判断当前数据的大小和之前数据的时间戳。在上面的案例中,如果当前数据时间戳大于上一个数据时间戳,则会取出之前发送给下游的中间结果,然后将最新的结果发送给下游(发送策略也是hash,具体hash策略就是根据groupby中的key发送),如果当前数据时间戳小于上一个数据时间戳,则不做操作。子算子输出的是每个用户对应的最新等级信息。Groupbyaggregationoperator:收到上游数据后,按照Groupby聚合粒度(每级用户数)进行聚合,发送给下游数据聚合operator。Datasink:接收到上游数据后,输出到外部存储引擎。情况2(处理时间):最原始的日志是明细数据。我们需要根据用户id过滤出用户当天的第一条数据发送给下游。下游可以据此计算各个维度的DAU。--数据来源:原始日志明细数据CREATETABLEsource_table(user_idBIGINTCOMMENT'用户id',nameSTRINGCOMMENT'用户名',server_timestampBIGINTCOMMENT'用户访问时间戳',proctimeASPROCTIME())WITH('connector'='datagen','rows-per-second'='1','fields.name.length'='1','fields.user_id.min'='1','fields.user_id.max'='10','fields.server_timestamp.min'='1','fields.server_timestamp.max'='100000');--Datasink:根据user_idCREATETABLEsink_table(user_idBIGINT,nameSTRING,server_timestampBIGINT)WITH('connector'='print');--处理逻辑:INSERTINTOsink_tableselectuser_id,name,server_timestampfrom(SELECTuser_id,name,server_timestamp,row_number()over(partitionbyuser_idorderbyproctime)asrnFROMsource_table)wherern=1output:+I[1,user1,2021-1-28T22:34]+I[2,user2,2021-1-28T22:34]+I[3,user3,2021-1-28T22:34]...可见此处理逻辑没有回撤数据。对应的SQL语义如下:数据源:数据在Kafka中消费后,通过哈希分布策略,根据partitionbykey发送给下游的去重算子。Deduplicationdeduplicationoperator:在处理时间的语义下,如果是当前key的第一条数据,则直接发送给下游。如果判断(根据状态下key是否被改变)不是第一条数据,则直接丢弃。Datasink:接收到上游数据后,输出到外部存储引擎。注:关于Deduplication是否会有回溯流,博主总结如下:?OrderbyeventtimeDESC:会出现回溯流,因为currentkey下可能有大于当前eventtime的数据。?OrderbyeventtimeASC:会有回溯流,因为当前key下可能有比当前事件时间更小的数据。?OrderbyprocessingtimeDESC:会有回溯流,因为当前key下可能有大于当前处理时间的数据。?OrderbyprocessingtimeASC:不会有提现流程,因为当前key下不可能有比当前处理时间短的数据。

猜你喜欢