1.前言-这篇文章的结构大家都熟悉datastreamapi,还是那句话,在datastream,你写的代码逻辑是什么?是的,这就是它最终执行的方式。不过大家对于flinksql的执行流程还是比较陌生的。上一节使用ETL、groupagg(sum、count等)简单聚合查询,带你进入flinksql查询逻辑的世界。帮助大家至少熟悉flinksql程序运行时flink程序在做什么。本节是窗口聚合章节的第一部分。以最简单最常用的分钟滚动窗口聚合案例来介绍它的用法和原理。由于flink1.13引入了windowtvf,因此1.13和1.12与之前版本的实现有所不同。本节首先介绍flink1.12及更早版本的tumblewindow实现。这也是引入flinksql能力时最常用的。本节依旧从后面的章节开始详细介绍flinksql的能力。1.目标——这篇文章能帮助你理解flinksql的哪些内容?回顾一下上一节flinksql的适用场景总结2.概念——先说说常见的窗口聚合窗口拖慢数据输出?常用windows3.实战篇-简单的tumblewindow案例及运行原理先看一个datastreamwindow案例flinksqltumblewindow语义tumblewindow实战案例GeneratedWatermarkGenerator-flink1.12.1BinaryRowDataKeySelector-flink1.12.1AggregateWindowOperator-flink1.12.14.总结与展望先说结论。以下结论在上一节中已经提到。这里附上上一节的文章:场景问题:flinksql很适合简单的ETL,几乎所有场景的聚合指标(本节介绍的tumblewindow都属于聚合指标范围)。语法问题:FlinkSQL语法与其他SQL语法基本一致。基本没有阻碍使用flinksql的语法问题。但本节介绍的滚动窗口的语法是略有不同的部分。详情如下。运行问题:查看flinksql任务的一些小技巧,以及可能遇到的一些陷阱:去flinkwebui查看任务当前在做什么。包括operator的名字,它会直接告诉我们哪个operator当前在做什么,在处理什么逻辑。sql的watermark类型应该设置为TIMESTAMP(3)。在事件时间逻辑上,SQLAPI和DataStreamAPI对数据记录时间戳的存储逻辑不同。datastreamapi:每条记录的rowtime放在StreamRecord中的timestamp字段中。sqlapi:每次从数据中获取时间戳。运算符中维护一个下标。时间戳可以通过下标从数据中获取。2.目标——这篇文章可以帮助你理解什么是flinksqltumblewindow?flinksqltumblewindow一般有以下几个问题。这篇文章的目的也是为大家解答这些问题:场景问题:场景问题就不用多说了,datastream在tumblewindow场景,分钟级聚合等常见场景语法问题有很多应用:flinksql写入翻转窗口任务。语法在配置单元sql中不可用。详情如下。运行问题:用一个简单的tumblewindowsql帮助大家从transformation和runtime了解tumblewindow的整体运行机制。误区:既然是sql就一定要遵循sql的语义,sqltumblewindow的聚合就是输入多项,输出一条数据。不能像datastream那样在windowudf中做多对多。在正式开始讲tumblewindow之前,我们先来看一下上一节flinksql适用场景的总结。让大家对flinksql有个整体的印象和总结。2.1.回头看上一节flinksql适用场景的总结,不想装了。我承认flinksql其实很适合做dwd清洗和dws聚合。这个主要针对实时数仓场景。FlinkSQL具备dwdcleaning和dwsaggregation的能力。基本上可以涵盖实时数仓的大部分场景。Flinksql真棒!!!但!!!根据博主使用flinksql的经验,并不是所有的dwd和dws聚合场景都适合flinksql(截至发帖阶段)!!!其实这些目前都不适合flinksql的场景总结就是相对于datastream在处理上还是会有一定的损失。先总结一下使用场景:1.dwd:简单清洗,复杂清洗,维度扩展,各种udfs的使用2.dws:各种聚合,然后分为适合的场景和不适合的场景,因为仅仅这篇文章无法涵盖所有??的内容,所以本文先给出一个大概的结论,然后再结合具体的场景进行详细的描述。适用场景:简单dwd清洗场景,全场景dws聚合场景目前不适合场景:复杂dwd清洗场景:比如大量使用udf清洗,特别是大量json类解析清洗关联维度场景:例如,在datastream中,经常会出现批量存储一批数据访问外部接口的场景。虽然flinksql目前针对这种场景已经具备了localcache和异步访问能力,但是还是一一访问外部缓存。与批量访问相比,它仍然具有更好的性能。差距。3.概念——先说常见的窗口聚合。datastreamapi大家都很熟悉了。目前,在实时数据处理过程中,窗口计算可以说是最重要和最常用的计算方法。不过在抛出windows的概念之前,博主对windows有几点看法。3.1.窗口实际上减慢了数据输出?一个小主意。先说结论:窗口会拖慢实时数据的输出,这是在目前下游分析引擎能力有限的情况下做出的妥协。站在数据开发的世界和需求方,我们当然希望所有的数据都是实时进来的,实时处理的,实时输出的,实时展示的。例如:如果要满足一分钟窗口聚合的pv、uv或其他聚合需求。olap数据服务引擎可以满足上述实时输入、实时处理、实时输出、实时展示的场景。Flink消费和处理详细数据,输出到Kafka,然后直接导入到olap引擎中。查询时,直接使用olap进行聚合。这里没有窗口的概念。但是在整个链路中,需要保证端到端的准确率是一次,在大数据量的情况下,olap引擎可以秒级查询返回,更不用说计算了一些重复数据删除指标和其他场景。把所有这些压力都放在olap引擎上是很多的。因此,在flink数据计算引擎中诞生了window的概念。我们可以直接在计算引擎中进行窗口聚合计算,然后在窗口结束后直接输出结果数据。这就是博主所说的窗口拖慢实时数据输出的地方。并且窗口处理不好可能会导致数据丢失。关于以上两种情况的具体优劣,就看大家自己选择了。以上只是博主的一些想法。3.2.常用窗目前已知的窗分为以下四种。1.翻滚Windows2。跳Windows3。累积Windows4。会话窗口这些窗口的具体描述可以直接在官网找到,有详细的说明。这里我就不细说了。https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/这里介绍两个flink中经常涉及到的容易混淆的概念是:window+键。这是图像的描述。窗口:一段时间内的划分。将无限流垂直切分,将无限流一一分割成窗口。窗口相当于无限流中一段时间??内的数据。key:数据类别上方的划分。横向划分无限流,同一个key的数据会被分成一组,这个key的数据也是无限流。如下所示。4.实用篇-简单的tumblewindow案例及运行原理源码公众号后台回复flinksqltumblewindow精彩解析之路获取。4.1.先看一个datastreamwindowcase在介绍sqltumblewindowwindowoperator的执行案例之前,先看一个datastream中的windowoperatorcase。它的逻辑是一样的。它将帮助我们理解sqltumblewindowoperator。我们先来看datastream的处理逻辑。以下面的这个例子为例。publicclass_04_TumbleWindowTest{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(newConfiguration());env.setParallelism(1);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.addSource(newUserDefinedSource())。>(Time.seconds(0)){@OverridepubliclongextractTimestamp(Tuple4元素){returnelement.f3;}}).keyBy(newKeySelector,String>(){@OverridepublicStringgetKey(Tuple4row)throwsException{returnrow.f0;}}).window(TumblingEventTimeWindows.of(Time.seconds(10))).sum(2).print();env.execute("1.12.1DataStreamTUMBLEWINDOW案例");}privatestaticclassUserDefinedSourceimplementsSourceFunction>{privatevolatilebooleanisCancel;@Overridepublicvoidrun(SourceContext>sourceContext)throwsException{while(!this.isCancel){sourceContext.collect(Tuple4.of("a","b",1,System.currentTimeMillis()));Thread.sleep(10L);}}@Overridepublicvoidcancel(){this.isCancel=true;}}}da??tastream产生的具体转换如下图所示:我们只关注最重要的WindowOperatorWindowOperator的重要属性如下图所示。我们来看看WindowOperator的执行逻辑。window执行的整体详细流程可以参考:http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/4.2。flinksqltumblewindow的语义介绍tumblewindow的语义,总是有对比介绍。这里引用的是datastreamapi。在数据流api中。滚动窗口一般用于以下两种场景。业务场景:使用tumblewindow,方便计算窗口内的聚合数据。一般情况下,有多个输入数据,窗口末尾有一个输出数据。优化场景:window聚合一批数据,然后访问外部存储批量扩展维度,或者有一些自定义的处理逻辑。一般有多个输入数据,在窗口末尾有多个输出数据。但是在sqlapi中。翻转窗口是一种聚合(分组依据)语义。sql标准中聚合的数据处理逻辑是多路输入,触发窗口时输出一条数据的语义。上面在datastream中经常用到的优化场景是多对多的场景。因此,它不符合sql语义。所以一般使用flinksqltumblewindow来计算聚合操作值。4.3.tumblewindow实际案例的特点是,tumblewindow将无穷大的流垂直分割成一个个窗口。每个窗口大小相同,不重叠。本文主要介绍flink1.12及更早版本的实现。下一篇介绍flink1.13的实现。来吧,在介绍原理之前,首先要使用它。让我们从下面的例子开始。1.(flink1.12.1)场景:简单常见的子维度级分钟级同时在线人数,总销量数据源表:CREATETABLEsource_table(--维度数据dimSTRING,--useriduser_idBIGINT,--userpriceBIGINT,--事件时间戳row_timeAScast(CURRENT_TIMESTAMPastimestamp(3)),--watermarksetWATERMARKFORrow_timeASrow_time-INTERVAL'5'SECOND)WITH('connector'='datagen','rows-per-second'='10','fields.dim.length'='1','fields.user_id.min'='1','fields.user_id.max'='100000','fields.price.min'='1','fields.price.max'='100000')注意事项-关于水印很容易踩坑:sql的水印类型必须设置为TIMESTAMP(3)。数据表:CREATETABLEsink_table(dimSTRING,pvBIGINT,sum_priceBIGINT,max_priceBIGINT,min_priceBIGINT,uvBIGINT,window_startbigint)WITH('connector'='print')数据处理逻辑:可以看下面的语法,windowaggregation有一个特殊的tumble(row_time,interval'1'minute)的写法,这是和我们平时写的hivesql、mysql等的区别。insertintosink_tableselectdim,sum(bucket_pv)aspv,sum(bucket_sum_price)assum_price,max(bucket_max_price)asmax_price,min(bucket_min_price)asmin_price,sum(bucket_uv)asuv,max(window_start)aswindow_startfrom(selectdim,count_price)asbucket(asbucket_sum_price,max(价格)asbucket_max_price,min(price)asbucket_min_price,--计算uv数count(distinctuser_id)asbucket_uv,cast(tumble_start(row_time,interval'1'minute)asbigint)*1000aswindow_startfromsource_tablegroupby--根据用户id分段bucket,防止数据倾斜mod(user_id,1024),dim,tumble(row_time,interval'1'minute))groupbydim,window_start2。Run:可以看到,其实在flinksqltask中,会把相应的处理逻辑写到operatorname上面。笔记-观察flinksql技能1:这其实是我们观察flinksql任务的第一个技能。如果你想知道你的flinktask在干什么,第一反应就是去flinkwebui看看task当前在干什么。包括算子的名字,它会直接告诉我们哪个算子当前在做什么,处理的是什么逻辑先看整个算子图,如下图。从左到右一共有三个运算符。第一个算子是数据源算子,第二个算子是分桶的窗口聚合算子。第一算子和第二算子之间的散列传输是基于组密钥进行散列传输的。第三个An算子是外层组合buckets的算子。它也是一种散列传输。将分桶数据组合到一个运算符中,以查看每个运算符的作用。第一个算子:tablescan读取数据源,从数据源中获取对应的字段(包括源表中定义的rowtime)并分配watermark(根据源表中定义的watermark分配对应的watermark)进行抽取一些必填字段。比如groupby中的字段。散列时需要用到。第二个算子:windowaggregation,计算windowaggregation数据根据第一层的数据计算并格式化dataselectCalculateandformatthedatasink写出来3.(flink1.12.1)result:+I(9,1,32682,32682,32682,1,1631026440000)-U(9,1,32682,32682,32682,1,1631026440000)+U(9,2,115351,82669,32682,2,1631026440000)+I(2,1,76148,76148,76148,1,1631026440000)+I(8,1,79321,79321,79321,1,1631026440000)+I(a,1,85792,85792,85792,1,1631026440000)+I(0,1,12858,12858,12858,1,1631026440000)+I(5,1,36753,36753,36753,1,1631026440000)+I(3,1,19218,19218,19218,1,1631026440000)4.(flink1.12.1)原理:启动SQL的机制详见上一节。这里只介绍与上一节相比新增的内容。大家可以看到上面代码的具体改造如下图。4.4.GeneratedWatermarkGenerator-flink1.12.1依次来看watermark算子。同datastream的自定义水印分配策略。watermark生成的具体代码WatermarkGenerator$6,主要是在currentWatermark方法中获取watermark的逻辑。如下所示。4.5.BinaryRowDataKeySelector-flink1.12.1后面是groupby(和datastream中的keyby一样)。groupbykey生成的具体代码为KeyProjection$19,主要逻辑在apply方法中。下一个是窗口聚合运算符。4.6.AggregateWindowOperator-flink1.12.1兄弟!!!兄弟!!!兄弟!!!本节的重点就在这里。sql窗口聚合算子分析完成。WatermarkGenerator和KeyProjection就不用详细介绍了。他们都是输入一个数据,输出一个数据。逻辑很简单。但是,窗口聚合算子的计算逻辑要比上述两种算子复杂得多。窗口算子承载了窗口聚合的主要逻辑,所以本文重点介绍窗口算子的计算逻辑。我们先来看看sql窗口整体的处理流程。其实和datastream的处理流程基本一样,只是少了Evictor。如下所示。接下来我们看一下上述SQL生成的窗口聚合算子AggregateWindowOperator。截图中的属性也很清楚。具体生成窗口聚合代码为GroupingWindowAggsHandler$59。计算逻辑GroupingWindowAggsHandler$59#accumulate。以上几段都是在flink客户端进行初始化和处理的。包括windowoperator的初始化等。flinkTM运行时会执行下面的处理逻辑,包括windowoperator资源的初始化和运行逻辑。现在是正式的数据处理阶段。窗口操作员任务运行。窗口操作员任务初始化。StreamTask的整体处理流程。打开的窗口操作符被初始化。初始化后窗口运算符打开的结果。如下图所示,对应具体的组件。初始化完成后,开始处理具体的数据。循环循环,一直跑啊跑啊。判断记录的具体类型,然后执行不同的逻辑。下面看一下处理一条数据的processElement方法逻辑,进行acc处理。代码中的windowAggregator就是前面代码生成的GroupingWindowAggsHandler$59。注意:在事件时间逻辑上,SQLAPI和DatastreamAPI对数据记录时间戳的存储逻辑不同。datastreamapi:每条记录的rowtime放在StreamRecord中的timestamp字段中。sqlapi:每次从数据中获取时间戳。运算符中维护一个下标。时间戳可以通过下标从数据中获取。我们看一下watermark到达触发窗口计算时执行的onEventTime逻辑。当触发窗口计算时,onEventTime->emitWindowResult会输出具体的数据。至此,整个sqltumblewindow的处理逻辑就很清晰了。它与数据流基本相同。整个逻辑都清楚了吗?5.总结与展望本文主要介绍滚动窗口聚合指标的常见场景案例及其底层运行原理。还介绍了查看flinksql任务的一些技巧:去flinkwebui查看任务当前在做什么。包括operator的名字,它会直接告诉我们哪个operator当前在做什么,在处理什么逻辑。sql的watermark类型应该设置为TIMESTAMP(3)。在事件时间逻辑上,SQLAPI和DataStreamAPI对数据记录时间戳的存储逻辑不同。datastreamapi:每条记录的rowtime放在StreamRecord中的timestamp字段中。sqlapi:每次从数据中获取时间戳。运算符中维护一个下标。时间戳可以通过下标从数据中获取。本文转载自微信公众号《大数据羊说》