当前位置: 首页 > 科技观察

FlinkRegularJoin和TTL的理解_0

时间:2023-03-21 14:27:27 科技观察

对于流式查询,RegularJoin的语法是最灵活的,允许任何类型的更新(insert,update,delete)输入表。RegularJoin包括以下几种类型(L作为左流中的数据标识,R作为右流中的数据标识):InnerJoin(InnerEqualJoin):两个流连接时,+[L,R]会输出LeftJoin(OuterEqualJoin):左流数据到达后,join到R流数据再输出+[L,R],如果没有Join则输出+[L,null]).如果数据在右流之后到达,发现左流有之前未加入的输出数据,则会发起回溯流,先输出-[L,null],再输出+[L,R]。RightJoin(OuterEqualJoin):与LeftJoin的逻辑相反。FullJoin(OuterEqualJoin):在流任务中,左流或右流的数据到达后,无论是否有数据加入到另一个流中,都会输出(对于右流:Jointooutput+[L,R],noJointooutput+[null,R];对于左流:Jointooutput+[L,R],noJointooutput+[L,null])。如果一个流的数据到达后,发现另一个流有之前没有加入的数据输出,则会发起一个withdrawal流(比如左边的流数据到达时:withdrawal-[null,R],output+[L,R],右流数据到达为例:回撤-[L,null],output+[L,R])。常规内部JoinFlinkSQL:CREATETABLEmatchResult(guidSTRING)WITH('connector'='kafka','topic'='match_result_log_test','properties.bootstrap.servers'='xxxxxxxxxxxxxxxxxx','properties.group.id'='flinkTestGroup','scan.startup.mode'='latest-offset','format'='json');CREATETABLEreadRecord(guidSTRING,book_nameSTRING)WITH('connector'='kafka','topic'='read_record_log_test','properties.bootstrap.servers'='xxxxxxxxxxxxxxxxxxxxx','properties.group.id'='flinkTestGroup','scan.startup.mode'='latest-offset','format'='json');CREATETABLEsink_table(guidSTRING,book_nameSTRING)WITH('connector'='print');INSERTINTOsink_tableSELECTmatchResult.guid,readRecord.book_nameFROMmatchResultINNERJOINreadRecordONmatchResult.guid=readRecord.guid;输出结果分析:--L流数据到达,因为没有Join到R流数据,是innerjoin,结果不会输出+I[111,book1]--R流数据到达,Join到L流数据,然后输出+I[111,book1]--R流数据到达,由于没有Join到L流数据,是innerjoin,结果不会output+I[222,book2]]--当L流数据到达时,加入到R流数据输出结果RegularLeftJoin(Rightjoin相反)FlinkSQL:CREATETABLEmatchResult(guidSTRING)WITH('connector'='kafka','topic'='match_result_log_test','properties.bootstrap.servers'='xxxxxxxxxxxxxxxxxxxxx','properties.group.id'='flinkTestGroup','scan.startup.mode'='latest-offset','格式'='json');CREATETABLEreadRecord(guidSTRING,book_nameSTRING)WITH('connector'='kafka','topic'='read_record_log_test','properties.bootstrap.servers'='xxxxxxxxxxxxxxxxxxxx','properties.group.id'='flinkTestGroup','scan.startup.mode'='latest-offset','format'='json');CREATETABLEsink_table(guidSTRING,book_nameSTRING)WITH('connector'='print');插入sink_tableSELECTmatchResult.guid,readRecord.book_nameFROMmatchResultLEFTJOINreadRecordONmatchResult.guid=readRecord.guid;输出结果分析:+I[111,null]--L流数据到达,如果没有Join到R流数据,则输出+[L,null]-D[111,null]--R的数据stream到达,如果发现L流有之前没有join的输出数据,会发起一个withdrawalstream,先output-[L,null]+I[111,book1]--然后Output+[L,R]--这里模拟了一个R流guid=222的数据到达,因为是leftjoin,没有加入到L流,所以没有输出+I[222,book2]--当Lstreamguid=222的数据到达joinR流后,输出结果+[L,R]RegularFullJoinFlinkSQL:CREATETABLEmatchResult(guidSTRING)WITH('connector'='kafka','topic'='match_result_log_test','properties.bootstrap.servers'='xxxxxxxxxxxxxxxxxxxxx','properties.group.id'='flinkTestGroup','scan.startup.mode'='latest-offset','format'='json');CREATETABLEreadRecord(guidSTRING,book_nameSTRING)WITH('connector'='kafka','topic'='read_record_log_test','properties.bootstrap.servers'='xxxxxxxxxxxxxxxxxxxx','properties.group.id'='flinkTestGroup','scan.startup.mode'='latest-offset','format'='json');CREATETABLEsink_table(guidSTRING,book_nameSTRING)WITH('connector'='print');INSERTINTOsink_tableSELECTmatchResult.guid,readRecord.book_nameFROMmatchResultFULLJOINreadRecordONmatchResult.guid=readRecord.guid;输出结果分析:+I[111,null]--到达L流数据,如果没有Join到R流数据,则Output+I[L,null]+I[null,book2]--R流数据到达,如果没有Join到R流数据,则输出+I[null,R]-D[null,book2]--L流new数据到达时,如果发现R流有输出数据之前没有加入过,会发起一个退出流,先输出-D[null,R]+I[222,book2]--再输出+I[L,R]-D[111,null]--viceversa+I[111,book1]TTLconcept在RegularJoin中,Flink会将没有时间窗口限制的两个流的所有数据都存储在State中,因为流是无穷无尽的,源源不断的,随着时间的推移,状态会越来越多会累积在记忆中。为了解决这个问题,Flink提出了空闲状态保持时间(IdleStateRetentionTime)的概念。通过为每个状态设置一个Timer,如果中途访问该状态,则重新设置Timer;否则(如果状态没有被访问过,长期处于Idle状态),当Timer超时时,状态将被清理。这样可以保证每个状态都能及时清理,可以通过table.exec.state.ttl参数控制(注意:这也会影响结果的准确性,所以合理权衡是必须的)。