当前位置: 首页 > 科技观察

实时数仓入门训练营:Flink版SQL实时计算实践

时间:2023-03-18 20:04:16 科技观察

简介内容:1.Flink版SQL实时计算介绍2.Flink版SQL实时计算实例SQL入门三、开发常见问题及解决方案Flink版SQL实时计算简介(一)关于实时计算Flink版SQL实时计算Flink版选择SQL这种声明式语言作为top级API,相对稳定,方便用户使用。FlinkSQL具有流批统一的特性,给用户一个语义一致的统一开发体验。另外,FlinkSQL可以自动优化,包括在流计算中屏蔽了状态的复杂性,也提供了自动优化的方案,还集成了AutoPilot自动调优功能。FlinkSQL也有广泛的应用场景,包括数据集成、实时报表、实时风控、在线机器学习等。(2)基本操作在基本操作方面,可以看出SQL的语法与标准SQL非常相似。该示例包括基本的SELECT和FILTER操作,您可以使用内置函数,例如日期格式,也可以使用自定义函数。比如例子中的汇率换算就是一个自定义函数,在平台注册后可以直接使用。(3)维表LookupJoin在实际的数据处理过程中,维表LookupJoin也是一个比较常见的例子。在基本操作方面,可以看到SQL的语法与标准SQL非常相似。该示例包括基本的SELECT和FILTER操作,您可以使用内置函数,例如日期格式,也可以使用自定义函数。比如例子中的汇率换算就是一个自定义函数,在平台注册后可以直接使用。(3)维表LookupJoin在实际的数据处理过程中,维表LookupJoin也是一个比较常见的例子。这里显示的是维度表INNERJOIN的示例。示例中的SOURCE表是一个实时变化的订单信息表。它通过INNERJOIN关联维表信息。这里维表JOIN的语法用黄色高亮显示,可以看出它有着不同于传统批处理的写法。不同的是增加了FORSYSTEM_TIMEASOF子句,表示是维表JOIN操作。每次来自SOURCE表的订单消息都会触发维表算子对维表信息进行查询,所以称为LookupJoin。(4)WindowAggregation窗口聚合(windowaggregation)操作也是一种常见的操作。FlinkSQL内置支持几种常用的Window类型,例如TumbleWindow、SessionWindow、HopWindow,以及新引入的CumulateWindow。TumbleTumbleWindow可以理解为固定大小的时间窗口,也称为滚动窗口,比如固定间隔5分钟、10分钟或1小时的窗口,窗口之间没有重叠。SessionSessionWindow(会话窗口)定义了一个连续事件的范围。window定义中有一个参数叫做SessionGap,意思是如果两次数据之间的间隔超过了定义的持续时间,那么前一个Window就会结束,同时会产生一个新的Window。窗户。HopHopWindow不同于滚动窗口,它不重叠,而滑动窗口的窗口是可以重叠的。滑动窗口有两个参数:size和slide。size是窗口的大小,slide是每张幻灯片的步长。如果slidesize,则窗口之间没有重叠和间隙。CumulateCumulateWindow(累积窗口)是Flink社区1.13版本新引入的。和HopWindow对比一下就可以理解了。不同的是,它是从WindowStart开始不断累加的。在示例中,Window1、Window2和Window3不断增长。它有一个最大的窗口长度。例如,如果我们定义WindowSize为一天,Stepsize为一小时,那么它将在一天中的每个小时生成累积到当前小时的聚合结果。看Window聚合处理的具体例子。如上图所示,例如需要每5分钟统计一次单个用户的点击次数。源数据是用户的点击日志。我们期望每5分钟计算一次单个用户的总点击次数。SQL使用社区中最新的WindowTVF语法。首先为源表打开窗口,然后对窗口对应的属性window_start和window_end进行GROUPBY,COUNT(*)为点击统计。可以看到在处理12:00到12:04的数据时,2个用户产生了4次点击,可以计算出用户Mary有3次点击,Bob有1次点击。在下一批数据中,又多了3条数据,对应更新到下一个窗口,分别是1次和2次。(5)GroupAggregation与WindowAggregation相比,GroupAggregation直接触发计算,无需等待窗口结束。一种适用场景是计算累计值。上图中的例子是统计单个用户截至当前时间累计的点击次数。从Query的角度来看,写法比较简单。直接GROUPBYuser计算COUNT(*),也就是累计计数。可以看出结果和Window的输出是不一样的。在与Window相同的前4个输入数据中,GroupAggregation输出的结果是Mary的命中数更新为3次。具体的计算过程可能是从1变成2再变成3,Bob是1次。随着接下来3条数据的输入,Bob对应的点击次数将更新为2次。结果是一个不断更新的过程,和Window的计算场景有些区别。之前的Window窗口输出的数据在窗口结束后不会发生变化,但是在GroupAggregation中,同一个GroupKey的结果会不断更新。(6)WindowAggregationVsGroupAggregation比较全面的比较了Window和GroupAggregation的一些区别。WindowAggregation在输出方式下按时输出,定义的数据过期后输出。例如,如果您定义5分钟的窗口,则输出会延迟。比如在00:00~00:05这个时间段,会等到整个窗口的数据都完整了再输出,结果只会输出一次,不会再输出。改变。GroupAggregation由数据触发。比如当第一条数据到来时,输出结果,当相同Key的第二条数据到来时,更新结果。因此,输出流的性质也不同。WindowAggregation一般输出AppendStream,而GroupAggregation输出UpdateStream。两者的区别在状态的处理上也比较大。WindowAggregation会自动清理过期数据,用户无需额外关注State的扩展。GroupAggregation是基于无限状态进行累加,所以用户需要根据自己的计算场景定义State的TTL,即State保存多长时间。比如统计一天累计的PV和UV,不管数据延迟多少,至少要保证State的TTL大于等于一天,这样才能保证计算的准确性。如果State的TTL定义为半天,统计值可能不准确。输出的存储要求也由输出流的性质决定。在Window的输出上,因为是Append流,所有类型都可以对接输出。但是,GroupAggregatio输出更新流,因此需要目标存储支持更新。Hologres、MySQL或HBase可用于支持更新。实时计算Flink版SQL上手示例下面通过具体的例子,看看各个SQL操作在真实的业务场景中是如何使用的,比如基本的SQL语法操作,包括一些常用聚合的使用。(1)实例场景说明:电子商务交易数据-实时数仓场景这里的实例是电子商务交易数据场景,模拟了实时数仓中的分层数据处理。在数据访问层,我们模拟了电子商务的交易订单数据,包括订单ID、商品ID、用户ID、交易金额、商品叶类、交易时间等基本信息,这是一个简化的表格。示例1将完成一次从接入层到数据明细层的数据清洗工作。另外还会关联品类信息,然后在数据汇总层,我们会演示如何完成分钟级别的交易统计,以及小时级别的口径如何做实时的交易统计,最后会介绍如何对天空层积累的交易场景做准实时统计。-示例环境:内测版本的demo环境为当前内测版本的实时计算Flink产品。在这个平台上,可以直接进行一站式的作业开发,包括调试,在线运维。-访问层数据使用SQLDataGenConnector生成模拟电子商务交易数据。接入层数据:为了方便演示,简化了链接,使用内置的SQLDataGenConnector来模拟电商数据的生成。order_id设计成自增序列,Connector的参数就不全贴出来了。DataGen连接器支持多种生成模式。比如Sequence可以用来生成自增序列,Random模式可以模拟随机值。这里根据不同的领域业务意义选择不同的生成策略。比如order_id是自增的,产品ID从1到10万随机选择,用户ID从1到1000万,交易金额以仙为单位。cate_id是叶子类别的ID。在这里,总共模拟了100个叶子类别。直接计算余数生成产品ID,使用当前时间模拟订单创建时间,这样就可以在开发平台调试,不需要创建Kafka或者DataHub做接入层模拟。(2)示例1-1数据清洗-电商交易数据-订单过滤这是一个数据清洗的场景。比如需要完成业务订单过滤。业务方可能对交易金额有最大最小异常过滤,比如Values大于1元小于10000保留为有效数据。事务的创建时间是在选定某个时间之后,通过WHERE条件的组合过滤就可以完成这个逻辑。真实的业务场景可能要复杂得多。让我们看看SQL是如何工作的。这是使用调试模式,点击平台上的运行按钮进行本地调试,可以看到金额栏被过滤了,订单创建时间也大于要求的时间值。从这个简单的清洗场景可以看出,实时和传统批处理相比传统批处理,在写法上,包括输出结果上,差别不大。与传统的批处理不同,流作业之间的主要区别在于它们在运行后会保持运行很长时间。批处理处理完数据后结束。(3)示例1-2类目信息关联接下来我们看一下如何进行维表关联。根据刚才接入层的订单数据,由于原始数据中包含叶子类别信息,业务中需要关联类别的维度表,维度表记录了叶子类别的关联关系、ID、名称到first-levelcategory,清洗过程的目标是将维表关联到原表中的叶类目ID,完成一级类目的ID和Name。这里写的是INNERJOIN维表,关联后选择维表对应的字段。写法和批处理的唯一区别在于维表的特殊语法FORSYSTEM_TIMEASOF。如上图,可以在平台上传自己的数据进行调试。例如,这里使用一个CSV测试数据,将100个叶子类别映射到10个一级类别。叶子类别ID对应的个位是其一级类别的ID,会关联对应的一级类别信息,返回其名称。本地调试运行的好处是速度比较快,马上就能看到结果。在本地调试模式下,终端接收到1000条数据后会自动暂停,防止结果过大影响使用。(4)示例2-1分钟级别的事务统计接下来我们看一下基于窗口的统计。第一种场景是分钟级别的交易统计,是汇总层常用的计算逻辑。分钟级别的统计很容易想到TumbleWindow。每分钟单独计算。需要计算几个指标,包括总订单数、总金额、售出商品数、售出用户数等,售出商品数和用户数需要去重,所以做了一个Distinct流程在书面上完成。窗口就是刚才介绍的TumbleWindow,按照订单创建时间划分一分钟的窗口,然后按照一级分类的维度统计每一分钟的交易状态。-运行模式和刚才的调试模式有点不同。上线后实际上是提交到集群上运行一个作业。它的输出使用调试输出,直接打印到Log中。展开作业拓扑,可以看到自动开启了local-global两阶段优化。-运行日志——查看调试输出运行一段时间后,可以通过Task中的日志看到最终的输出。使用的是PrintSink,会直接打Log。在真实场景的输出中,比如写入Hologres/MySQL,需要查看对应存储的数据库。可以看出,输出的数据与数据的原始时间存在一定的滞后。19:46:05,输出19:45:00窗口的数据,延迟约5秒后输出前1分钟的聚合结果。这5秒其实和定义源表时WATERMARK的设置有关。声明WATERMARK时,将5秒的偏移量添加到gmt_create字段。这样做的效果是当19:46:00最早的数据到达时,我们认为水位已经到了19:45:55,也就是延迟5秒的效果,实现对乱序的容忍处理数据。(5)示例2-2小时级实时交易统计第二个示例是进行小时级实时交易统计。如上图所示,当需要实时统计时,可以直接打开TumbleWindow为1小时TumbleWindow。这能满足实时性吗?从刚刚展示的输出结果来看,有一定的延时效果。所以如果开一个小时的窗口,必须等到这一小时的数据全部接收完,才能在下一个小时开始的时候输出上一小时的结果。如果延迟在小时级别,则无法满足实时性要求。回顾前面介绍的GroupAggregation可以满足实时性要求。具体来说,比如需要完成小时+类别和仅小时两个口径的统计。两个统计是一起做的。传统批处理中常用的GROUPINGSETS功能在实时Flink上也得到了支持。我们可以直接GROUPBYGROUPINGSETS,第一个是小时全口径,第二个是类目+小时统计口径,然后计算它的订单数,包括总金额,去重项数和用户数.这种写法在结果中加入了一个空值转换过程,方便查看数据,即小时全口径的统计,一级分类输出为空,需要一个空值转换过程对其执行。具体来说,比如需要完成小时+类别和仅小时两个口径的统计。两个统计是一起做的。传统批处理中常用的GROUPINGSETS功能在实时Flink上也得到了支持。我们可以直接GROUPBYGROUPINGSETS,第一个是小时全口径,第二个是类目+小时统计口径,然后计算它的订单数,包括总金额,去重项数和用户数.这种写法在结果中加入了一个空值转换过程,方便查看数据,即小时全口径的统计,一级分类输出为空,需要一个空值转换过程对其执行。以上是debug模式的运行过程,可以看到Datagen生成的数据实时更新到一级分类及其对应的小时。这里可以看到两个不同的GROUPBY的结果是一起输出的,中间有一列ALL是空值转换而来的,就是全口径的统计值。本地调试比较直观方便。有兴趣的也可以在阿里云官网申请或购买体验。(6)示例2-3日级累计交易准实时统计第三个示例是日级累计交易统计。业务需求是准实时的,比如可以接受分钟级的更新延迟。根据刚才GroupAggregationhours的实时统计,很容易想到直接改成sky维度的query就可以满足这个需求,而且实时性比较高,数据更新可以在触发后几秒。回顾前面提到的Window和GroupAggregation在内置状态处理方面的区别,WindowAggregation可以实现自动状态清理,而GroupAggregation需要用户自己调整TTL。由于业务的准实时需求,这里可以有替代的方案,比如使用新引入的CumulateWindow做累加Window计算,然后使用分钟级步长进行天级累加来实现每分钟准确更新。实时要求。查看如上所示的累积窗口。对于天级别的累加,窗口的最大尺寸为天,其WindowStep为一分钟,这样可以表达天级别的累加统计。具体查询如上。这里使用了新的TVF语法,通过一个TABLE关键字在中间包含了Windows的定义,然后CumulateWindow引用了输入表,然后定义了它的时间属性,步长和大小参数。GROUPBY是一种常见的写法,因为它有高级输出,所以我们把窗口的开始时间和结束时间一起打印出来。这个例子也在线运行,可以看到Log输出。-在running模式下可以看到,和之前TumbleWindow运行的结构类似。也是预聚合加上全局聚合。它和TumbleWindow的区别在于它不需要等到这一天所有的数据都可用了才输出结果。-运行日志——观察调试结果从上面的例子可以看到,在20:47:00,已经累计了00:00:00到20:47:00的结果,对应的统计列有4个值。下一个输出是下一个累积窗口。可以看到20:47:00到20:48:00是一个累计步长,既满足天级别的累计统计需求,又满足准实时需求。(七)实例总结:电商交易数据-实时数仓场景接下来我们对以上实例做一个整体的总结。从接入层到细节层的清洗过程特点比较简单明了。比如在业务逻辑上,固定的过滤条件,包括维度扩展,都是非常清晰直接的。从明细层到汇总层,示例中分钟级的统计我们使用了TumbleWindow,而小时级的统计由于实时性要求被GroupAggregation代替,然后在天级进行累加分别显示GroupAggregation和新引入的Cumulate窗口。从聚合层的计算特性来看,我们需要关注业务的实时性要求和数据准确性要求,然后根据实际情况选择Window聚合或Group聚合。为什么这里提到数据准确性?开始比较WindowAggregation和GroupAggregation的时候提到GroupAggregation的实时性非常好,但是它的数据精度依赖于State的TTL。当统计周期大于TTL时,TTL数据可能会失真。相反,在WindowAggregation上,对于乱序的容忍度是有上限的,比如最多等待一分钟,但是在实际的业务数据中,99%的数据都可能满足这个要求,而1%的数据可能需要一个小时才能到。基于WATERMARK的处理,默认是丢弃策略。超出最大偏移量的数据将被丢弃,不计入统计。这时,数据也会失去准确性。因此,这是一个相对指标,需要根据具体的业务场景来选择。开发常见问题及解决方案(一)开发中常见问题以上是实时计算和真实业务接触过程中比较常见的问题。首先,实时计算不知道如何入手,实时计算如何入手,比如有的同学有批处理背景,然后刚开始接触FlinkSQL,不知道从何入手。还有一类问题是SQL写好了,清楚数据输入和处理的层次,但不知道实时作业跑起来后需要设置多少资源。还有一种就是SQL写起来比较复杂,这个时候需要做。调试,比如检查为什么计算出的数据不符合预期等类似问题,很多同学反映无从下手。作业运行后如何调优也是一个非常高频的问题。(二)开发中常见问题的解决方案1、如何启动实时计算?对于入门,社区有很多官方文档,也提供了一些例子。可以从简单的例子开始,逐步了解SQL中不同的运算符,以及流计算会有什么样的特点。另外,大家也可以关注开发者社区实时计算Flink版、ververica.cn网站、B站ApacheFlink公众号等分享内容。熟悉SQL后,如果想将其应用到生产环境中解决实际业务问题,阿里云的行业解决方案也提供了一些典型的架构设计,可以作为参考。2.如何调试复杂的作业?如果遇到千行级别的复杂SQL,即使是Flink开发者,也无法一眼就定位到问题所在。其实还是需要按照从简单到复杂的流程,可能还需要用到一些调试工具,比如前面演示的平台。调试功能,然后分段验证。将部分SQL结果调试正确后,再一步步拼装,使这个复杂的操作最终达到正确性要求。此外,还可以利用SQL语法的特点,更清晰地组织SQL。实时计算的Flink产品有一个代码结构功能,可以很方便的定位到长SQL中的具体语句,这些都是一些辅助工具。3、如何优化作业的初始资源设置?我们有一个经验,先根据输入数据做一个小的并发测试,看看它的性能如何,然后进行估计。在大并发压测时,逐步逼近需求吞吐量,进而获得预期的性能配置,是一种比较直接但可靠的方式。调优主要是根据job的运行情况。我们会关注一些关键指标,比如有没有数据倾斜,维表的lookupjoin需要访问外部存储,有没有IO瓶颈等。是影响工作绩效的常见瓶颈,需要引起重视。在实时计算的Flink产品中集成了一个叫做AutoPilot的功能,可以理解为类似于自动驾驶。在此功能下,初始资源设置不是一个麻烦的问题。在产品方面,在为一个作业设置最大资源限制后,引擎可以根据实际的数据处理量自动帮我们调整应该使用多少资源,并根据负载情况进行伸缩。原文链接:http://click.aliyun.com/m/1000283891/